diff --git a/cpp/src/arrow/public_api_test.cc b/cpp/src/arrow/public_api_test.cc index 875d07d8152..25e43d3b9b3 100644 --- a/cpp/src/arrow/public_api_test.cc +++ b/cpp/src/arrow/public_api_test.cc @@ -30,10 +30,6 @@ #error "ASSIGN_OR_RAISE should not be visible from Arrow public headers." #endif -#ifdef ARROW_UTIL_PARALLEL_H -#error "arrow/util/parallel.h is an internal header" -#endif - #include #include diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index f5c658d08f2..718307deedf 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -43,6 +43,7 @@ add_arrow_test(utility-test align_util_test.cc bit_block_counter_test.cc bit_util_test.cc + cache_test.cc checked_cast_test.cc compression_test.cc decimal_test.cc @@ -73,6 +74,7 @@ add_arrow_test(threading-utility-test add_arrow_benchmark(bit_block_counter_benchmark) add_arrow_benchmark(bit_util_benchmark) +add_arrow_benchmark(cache_benchmark) add_arrow_benchmark(compression_benchmark) add_arrow_benchmark(decimal_benchmark) add_arrow_benchmark(hashing_benchmark) diff --git a/cpp/src/arrow/util/cache_benchmark.cc b/cpp/src/arrow/util/cache_benchmark.cc new file mode 100644 index 00000000000..7439ee2f501 --- /dev/null +++ b/cpp/src/arrow/util/cache_benchmark.cc @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include +#include +#include + +#include "arrow/array.h" +#include "arrow/testing/random.h" +#include "arrow/util/cache_internal.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" + +namespace arrow { +namespace internal { + +static constexpr int32_t kCacheSize = 100; +static constexpr int32_t kSmallKeyLength = 8; +static constexpr int32_t kLargeKeyLength = 64; +static constexpr int32_t kSmallValueLength = 16; +static constexpr int32_t kLargeValueLength = 1024; + +static std::vector MakeStrings(int64_t nvalues, int64_t min_length, + int64_t max_length) { + auto rng = ::arrow::random::RandomArrayGenerator(42); + auto arr = checked_pointer_cast(rng.String( + nvalues, static_cast(min_length), static_cast(max_length))); + std::vector vec(nvalues); + for (int64_t i = 0; i < nvalues; ++i) { + vec[i] = arr->GetString(i); + } + return vec; +} + +static std::vector MakeStrings(int64_t nvalues, int64_t length) { + return MakeStrings(nvalues, length, length); +} + +template +static void BenchmarkCacheLookups(benchmark::State& state, const std::vector& keys, + const std::vector& values) { + const int32_t nitems = static_cast(keys.size()); + Cache cache(nitems); + for (int32_t i = 0; i < nitems; ++i) { + cache.Replace(keys[i], values[i]); + } + + for (auto _ : state) { + int64_t nfinds = 0; + for (const auto& key : keys) { + nfinds += (cache.Find(key) != nullptr); + } + benchmark::DoNotOptimize(nfinds); + ARROW_CHECK_EQ(nfinds, nitems); + } + state.SetItemsProcessed(state.iterations() * nitems); +} + +static void LruCacheLookup(benchmark::State& state) { + const auto keys = MakeStrings(kCacheSize, state.range(0)); + const auto values = MakeStrings(kCacheSize, state.range(1)); + BenchmarkCacheLookups>(state, keys, values); +} + +static void SetCacheArgs(benchmark::internal::Benchmark* bench) { + bench->Args({kSmallKeyLength, kSmallValueLength}); + bench->Args({kSmallKeyLength, kLargeValueLength}); + bench->Args({kLargeKeyLength, kSmallValueLength}); + bench->Args({kLargeKeyLength, kLargeValueLength}); +} + +BENCHMARK(LruCacheLookup)->Apply(SetCacheArgs); + +struct Callable { + explicit Callable(std::vector values) + : index_(0), values_(std::move(values)) {} + + std::string operator()(const std::string& key) { + // Return a value unrelated to the key + if (++index_ >= static_cast(values_.size())) { + index_ = 0; + } + return values_[index_]; + } + + private: + int64_t index_; + std::vector values_; +}; + +template +static void BenchmarkMemoize(benchmark::State& state, Memoized&& mem, + const std::vector& keys) { + // Prime memoization cache + for (const auto& key : keys) { + mem(key); + } + + for (auto _ : state) { + int64_t nbytes = 0; + for (const auto& key : keys) { + nbytes += static_cast(mem(key).length()); + } + benchmark::DoNotOptimize(nbytes); + } + state.SetItemsProcessed(state.iterations() * keys.size()); +} + +static void MemoizeLruCached(benchmark::State& state) { + const auto keys = MakeStrings(kCacheSize, state.range(0)); + const auto values = MakeStrings(kCacheSize, state.range(1)); + auto mem = MemoizeLru(Callable(values), kCacheSize); + BenchmarkMemoize(state, mem, keys); +} + +static void MemoizeLruCachedThreadUnsafe(benchmark::State& state) { + const auto keys = MakeStrings(kCacheSize, state.range(0)); + const auto values = MakeStrings(kCacheSize, state.range(1)); + // Emulate recommended usage of MemoizeLruCachedThreadUnsafe + // (the compiler is probably able to cache the TLS-looked up value, though) + thread_local auto mem = MemoizeLruThreadUnsafe(Callable(values), kCacheSize); + BenchmarkMemoize(state, mem, keys); +} + +BENCHMARK(MemoizeLruCached)->Apply(SetCacheArgs); +BENCHMARK(MemoizeLruCachedThreadUnsafe)->Apply(SetCacheArgs); + +} // namespace internal +} // namespace arrow diff --git a/cpp/src/arrow/util/cache_internal.h b/cpp/src/arrow/util/cache_internal.h new file mode 100644 index 00000000000..231fd800b67 --- /dev/null +++ b/cpp/src/arrow/util/cache_internal.h @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/util/functional.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" + +namespace arrow { +namespace internal { + +// A LRU (Least recently used) replacement cache +template +class LruCache { + public: + explicit LruCache(int32_t capacity) : capacity_(capacity) { + // The map size can temporarily exceed the cache capacity, see Replace() + map_.reserve(capacity_ + 1); + } + + ARROW_DISALLOW_COPY_AND_ASSIGN(LruCache); + ARROW_DEFAULT_MOVE_AND_ASSIGN(LruCache); + + void Clear() { + items_.clear(); + map_.clear(); + // The C++ spec doesn't tell whether map_.clear() will shrink the map capacity + map_.reserve(capacity_ + 1); + } + + int32_t size() const { + DCHECK_EQ(items_.size(), map_.size()); + return static_cast(items_.size()); + } + + template + Value* Find(K&& key) { + const auto it = map_.find(key); + if (it == map_.end()) { + return nullptr; + } else { + // Found => move item at front of the list + auto list_it = it->second; + items_.splice(items_.begin(), items_, list_it); + return &list_it->value; + } + } + + template + std::pair Replace(K&& key, V&& value) { + // Try to insert temporary iterator + auto pair = map_.emplace(std::forward(key), ListIt{}); + const auto it = pair.first; + const bool inserted = pair.second; + if (inserted) { + // Inserted => push item at front of the list, and update iterator + items_.push_front(Item{&it->first, std::forward(value)}); + it->second = items_.begin(); + // Did we exceed the cache capacity? If so, remove least recently used item + if (static_cast(items_.size()) > capacity_) { + const bool erased = map_.erase(*items_.back().key); + DCHECK(erased); + ARROW_UNUSED(erased); + items_.pop_back(); + } + return {true, &it->second->value}; + } else { + // Already exists => move item at front of the list, and update value + auto list_it = it->second; + items_.splice(items_.begin(), items_, list_it); + list_it->value = std::forward(value); + return {false, &list_it->value}; + } + } + + private: + struct Item { + // Pointer to the key inside the unordered_map + const Key* key; + Value value; + }; + using List = std::list; + using ListIt = typename List::iterator; + + const int32_t capacity_; + // In most to least recently used order + std::list items_; + std::unordered_map map_; +}; + +namespace detail { + +template +struct ThreadSafeMemoizer { + using RetType = Value; + + template + ThreadSafeMemoizer(F&& func, int32_t cache_capacity) + : func_(std::forward(func)), cache_(cache_capacity) {} + + // The memoizer can't return a pointer to the cached value, because + // the cache entry may be evicted by another thread. + + Value operator()(const Key& key) { + std::unique_lock lock(mutex_); + const Value* value_ptr; + value_ptr = cache_.Find(key); + if (ARROW_PREDICT_TRUE(value_ptr != nullptr)) { + return *value_ptr; + } + lock.unlock(); + Value v = func_(key); + lock.lock(); + return *cache_.Replace(key, std::move(v)).second; + } + + private: + std::mutex mutex_; + Func func_; + Cache cache_; +}; + +template +struct ThreadUnsafeMemoizer { + using RetType = const Value&; + + template + ThreadUnsafeMemoizer(F&& func, int32_t cache_capacity) + : func_(std::forward(func)), cache_(cache_capacity) {} + + const Value& operator()(const Key& key) { + const Value* value_ptr; + value_ptr = cache_.Find(key); + if (ARROW_PREDICT_TRUE(value_ptr != nullptr)) { + return *value_ptr; + } + return *cache_.Replace(key, func_(key)).second; + } + + private: + Func func_; + Cache cache_; +}; + +template