diff --git a/CMakeLists.txt b/CMakeLists.txt index b53f5980..f03c53f4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -674,6 +674,7 @@ set(LIBPARQUET_SRCS src/parquet/arrow/record_reader.cc src/parquet/arrow/schema.cc src/parquet/arrow/writer.cc + src/parquet/bloom_filter.cc src/parquet/column_reader.cc src/parquet/column_scanner.cc src/parquet/column_writer.cc @@ -681,6 +682,7 @@ set(LIBPARQUET_SRCS src/parquet/file_reader.cc src/parquet/file_writer.cc src/parquet/metadata.cc + src/parquet/murmur3.cc src/parquet/parquet_constants.cpp src/parquet/parquet_types.cpp src/parquet/printer.cc diff --git a/data/bloom_filter.bin b/data/bloom_filter.bin new file mode 100644 index 00000000..c0e30ce7 Binary files /dev/null and b/data/bloom_filter.bin differ diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt index bc16d8bd..93a242c6 100644 --- a/src/parquet/CMakeLists.txt +++ b/src/parquet/CMakeLists.txt @@ -17,6 +17,7 @@ # Headers: top level install(FILES + bloom_filter.h column_reader.h column_page.h column_scanner.h @@ -25,7 +26,9 @@ install(FILES exception.h file_reader.h file_writer.h + hasher.h metadata.h + murmur3.h printer.h properties.h schema.h @@ -50,6 +53,7 @@ install(FILES "${CMAKE_CURRENT_BINARY_DIR}/parquet.pc" DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/") +ADD_PARQUET_TEST(bloom_filter-test) ADD_PARQUET_TEST(column_reader-test) ADD_PARQUET_TEST(column_scanner-test) ADD_PARQUET_TEST(column_writer-test) diff --git a/src/parquet/README b/src/parquet/README new file mode 100644 index 00000000..fc16a46c --- /dev/null +++ b/src/parquet/README @@ -0,0 +1,10 @@ +The CompatibilityTest of bloom_filter-test.cc is used to test cross compatibility of +Bloom filters between parquet-mr and parquet-cpp. It reads the Bloom filter binary +generated by the Bloom filter class in the parquet-mr project and tests whether the +values inserted before could be filtered or not. + +The Bloom filter binary is generated by three steps from Parquet-mr: +Step 1: Construct a Bloom filter with 1024 bytes of bitset. +Step 2: Insert hashes of "hello", "parquet", "bloom", "filter" strings to Bloom filter +by calling hash and insert APIs. +Step 3: Call writeTo API to write to File. diff --git a/src/parquet/bloom_filter-test.cc b/src/parquet/bloom_filter-test.cc new file mode 100644 index 00000000..dbef8c8b --- /dev/null +++ b/src/parquet/bloom_filter-test.cc @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include + +#include "arrow/io/file.h" +#include "parquet/bloom_filter.h" +#include "parquet/murmur3.h" +#include "parquet/util/memory.h" +#include "parquet/util/test-common.h" + +namespace parquet { +namespace test { +TEST(Murmur3Test, TestBloomFilter) { + uint64_t result; + const uint8_t bitset[8] = {0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7}; + ByteArray byteArray(8, bitset); + MurmurHash3 murmur3; + result = murmur3.Hash(&byteArray); + EXPECT_EQ(result, UINT64_C(913737700387071329)); +} + +TEST(ConstructorTest, TestBloomFilter) { + BlockSplitBloomFilter bloom_filter; + EXPECT_NO_THROW(bloom_filter.Init(1000)); + + // It throws because the length cannot be zero + std::unique_ptr bitset1(new uint8_t[1024]()); + EXPECT_THROW(bloom_filter.Init(bitset1.get(), 0), ParquetException); + + // It throws because the number of bytes of Bloom filter bitset must be a power of 2. + std::unique_ptr bitset2(new uint8_t[1024]()); + EXPECT_THROW(bloom_filter.Init(bitset2.get(), 1023), ParquetException); +} + +// The BasicTest is used to test basic operations including InsertHash, FindHash and +// serializing and de-serializing. +TEST(BasicTest, TestBloomFilter) { + BlockSplitBloomFilter bloom_filter; + bloom_filter.Init(1024); + + for (int i = 0; i < 10; i++) { + bloom_filter.InsertHash(bloom_filter.Hash(i)); + } + + for (int i = 0; i < 10; i++) { + EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(i))); + } + + // Serialize Bloom filter to memory output stream + InMemoryOutputStream sink; + bloom_filter.WriteTo(&sink); + + // Deserialize Bloom filter from memory + InMemoryInputStream source(sink.GetBuffer()); + + BlockSplitBloomFilter de_bloom = std::move(BlockSplitBloomFilter::Deserialize(&source)); + + for (int i = 0; i < 10; i++) { + EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(i))); + } +} + +// Helper function to generate random string. +std::string GetRandomString(uint32_t length) { + // Character set used to generate random string + const std::string charset = + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + + // The uuid_seed was generated by "uuidgen -r" + const std::string uuid_seed = "8de406aa-fb59-4195-a81c-5152af26433f"; + std::seed_seq seed(uuid_seed.begin(), uuid_seed.end()); + std::mt19937 generator(seed); + std::uniform_int_distribution dist(0, static_cast(charset.size() - 1)); + std::string ret = ""; + + for (uint32_t i = 0; i < length; i++) { + ret += charset[dist(generator)]; + } + + return ret; +} + +TEST(FPPTest, TestBloomFilter) { + // It counts the number of times FindHash returns true. + int exist = 0; + + // Total count of elements that will be used + const int total_count = 100000; + + // Bloom filter fpp parameter + const double fpp = 0.01; + + std::vector members; + BlockSplitBloomFilter bloom_filter; + bloom_filter.Init(BlockSplitBloomFilter::OptimalNumOfBits(total_count, fpp)); + + // Insert elements into the Bloom filter + for (int i = 0; i < total_count; i++) { + // Insert random string which length is 8 + std::string tmp = GetRandomString(8); + const ByteArray byte_array(8, reinterpret_cast(tmp.c_str())); + members.push_back(tmp); + bloom_filter.InsertHash(bloom_filter.Hash(&byte_array)); + } + + for (int i = 0; i < total_count; i++) { + const ByteArray byte_array1(8, reinterpret_cast(members[i].c_str())); + ASSERT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(&byte_array1))); + std::string tmp = GetRandomString(7); + const ByteArray byte_array2(7, reinterpret_cast(tmp.c_str())); + + if (bloom_filter.FindHash(bloom_filter.Hash(&byte_array2))) { + exist++; + } + } + + // The exist should be probably less than 1000 according default FPP 0.01. + EXPECT_TRUE(exist < total_count * fpp); +} + +// The CompatibilityTest is used to test cross compatibility with parquet-mr, it reads +// the Bloom filter binary generated by the Bloom filter class in the parquet-mr project +// and tests whether the values inserted before could be filtered or not. + +// The Bloom filter binary is generated by three steps in from Parquet-mr. +// Step 1: Construct a Bloom filter with 1024 bytes bitset. +// Step 2: Insert "hello", "parquet", "bloom", "filter" to Bloom filter. +// Step 3: Call writeTo API to write to File. +TEST(CompatibilityTest, TestBloomFilter) { + const std::string test_string[4] = {"hello", "parquet", "bloom", "filter"}; + const std::string bloom_filter_test_binary = + std::string(test::get_data_dir()) + "/bloom_filter.bin"; + std::shared_ptr<::arrow::io::ReadableFile> handle; + + int64_t size; + PARQUET_THROW_NOT_OK( + ::arrow::io::ReadableFile::Open(bloom_filter_test_binary, &handle)); + PARQUET_THROW_NOT_OK(handle->GetSize(&size)); + + // 1024 bytes (bitset) + 4 bytes (hash) + 4 bytes (algorithm) + 4 bytes (length) + EXPECT_EQ(size, 1036); + + std::unique_ptr bitset(new uint8_t[size]()); + std::shared_ptr buffer(new Buffer(bitset.get(), size)); + handle->Read(size, &buffer); + + InMemoryInputStream source(buffer); + BlockSplitBloomFilter bloom_filter1 = + std::move(BlockSplitBloomFilter::Deserialize(&source)); + + for (int i = 0; i < 4; i++) { + const ByteArray tmp(static_cast(test_string[i].length()), + reinterpret_cast(test_string[i].c_str())); + EXPECT_TRUE(bloom_filter1.FindHash(bloom_filter1.Hash(&tmp))); + } + + // The following is used to check whether the new created Bloom filter in parquet-cpp is + // byte-for-byte identical to file at bloom_data_path which is created from parquet-mr + // with same inserted hashes. + BlockSplitBloomFilter bloom_filter2; + bloom_filter2.Init(bloom_filter1.GetBitsetSize()); + for (int i = 0; i < 4; i++) { + const ByteArray byte_array(static_cast(test_string[i].length()), + reinterpret_cast(test_string[i].c_str())); + bloom_filter2.InsertHash(bloom_filter2.Hash(&byte_array)); + } + + // Serialize Bloom filter to memory output stream + InMemoryOutputStream sink; + bloom_filter2.WriteTo(&sink); + std::shared_ptr buffer1 = sink.GetBuffer(); + + handle->Seek(0); + handle->GetSize(&size); + std::shared_ptr buffer2; + handle->Read(size, &buffer2); + + EXPECT_TRUE((*buffer1).Equals(*buffer2)); +} + +// OptmialValueTest is used to test whether OptimalNumOfBits returns expected +// numbers according to formula: +// num_of_bits = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8.0)) +// where ndv is the number of distinct values and fpp is the false positive probability. +// Also it is used to test whether OptimalNumOfBits returns value between +// [MINIMUM_BLOOM_FILTER_SIZE, MAXIMUM_BLOOM_FILTER_SIZE]. +TEST(OptimalValueTest, TestBloomFilter) { + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(256, 0.01), UINT32_C(4096)); + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(512, 0.01), UINT32_C(8192)); + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1024, 0.01), UINT32_C(16384)); + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(2048, 0.01), UINT32_C(32768)); + + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.01), UINT32_C(2048)); + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.01), UINT32_C(4096)); + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.01), UINT32_C(8192)); + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.01), UINT32_C(16384)); + + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.025), UINT32_C(2048)); + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.025), UINT32_C(4096)); + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.025), UINT32_C(8192)); + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.025), UINT32_C(16384)); + + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.05), UINT32_C(2048)); + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.05), UINT32_C(4096)); + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.05), UINT32_C(8192)); + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.05), UINT32_C(16384)); + + // Boundary check + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(4, 0.01), UINT32_C(256)); + EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(4, 0.25), UINT32_C(256)); + + EXPECT_EQ( + BlockSplitBloomFilter::OptimalNumOfBits(std::numeric_limits::max(), 0.01), + UINT32_C(1073741824)); + EXPECT_EQ( + BlockSplitBloomFilter::OptimalNumOfBits(std::numeric_limits::max(), 0.25), + UINT32_C(1073741824)); +} + +} // namespace test + +} // namespace parquet diff --git a/src/parquet/bloom_filter.cc b/src/parquet/bloom_filter.cc new file mode 100644 index 00000000..faa344cb --- /dev/null +++ b/src/parquet/bloom_filter.cc @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include + +#include "arrow/status.h" +#include "arrow/util/bit-util.h" +#include "parquet/bloom_filter.h" +#include "parquet/exception.h" +#include "parquet/murmur3.h" +#include "parquet/types.h" +#include "parquet/util/logging.h" + +namespace parquet { +constexpr uint32_t BlockSplitBloomFilter::SALT[kBitsSetPerBlock]; + +BlockSplitBloomFilter::BlockSplitBloomFilter() + : pool_(::arrow::default_memory_pool()), + hash_strategy_(HashStrategy::MURMUR3_X64_128), + algorithm_(Algorithm::BLOCK) {} + +void BlockSplitBloomFilter::Init(uint32_t num_bytes) { + if (num_bytes < kMinimumBloomFilterBytes) { + num_bytes = kMinimumBloomFilterBytes; + } + + // Get next power of 2 if it is not power of 2. + if ((num_bytes & (num_bytes - 1)) != 0) { + num_bytes = static_cast(::arrow::BitUtil::NextPower2(num_bytes)); + } + + if (num_bytes > kMaximumBloomFilterBytes) { + num_bytes = kMaximumBloomFilterBytes; + } + + num_bytes_ = num_bytes; + PARQUET_THROW_NOT_OK(::arrow::AllocateBuffer(pool_, num_bytes_, &data_)); + memset(data_->mutable_data(), 0, num_bytes_); + + this->hasher_.reset(new MurmurHash3()); +} + +void BlockSplitBloomFilter::Init(const uint8_t* bitset, uint32_t num_bytes) { + DCHECK(bitset != nullptr); + + if (num_bytes < kMinimumBloomFilterBytes || num_bytes > kMaximumBloomFilterBytes || + (num_bytes & (num_bytes - 1)) != 0) { + throw ParquetException("Given length of bitset is illegal"); + } + + num_bytes_ = num_bytes; + PARQUET_THROW_NOT_OK(::arrow::AllocateBuffer(pool_, num_bytes_, &data_)); + memcpy(data_->mutable_data(), bitset, num_bytes_); + + this->hasher_.reset(new MurmurHash3()); +} + +BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(InputStream* input) { + int64_t bytes_available; + + const uint8_t* read_buffer = NULL; + read_buffer = input->Read(sizeof(uint32_t), &bytes_available); + if (static_cast(bytes_available) != sizeof(uint32_t) || !read_buffer) { + throw ParquetException("Failed to deserialize from input stream"); + } + uint32_t len; + memcpy(&len, read_buffer, sizeof(uint32_t)); + + read_buffer = input->Read(sizeof(uint32_t), &bytes_available); + if (static_cast(bytes_available) != sizeof(uint32_t) || !read_buffer) { + throw ParquetException("Failed to deserialize from input stream"); + } + uint32_t hash; + memcpy(&hash, read_buffer, sizeof(uint32_t)); + if (static_cast(hash) != HashStrategy::MURMUR3_X64_128) { + throw ParquetException("Unsupported hash strategy"); + } + + read_buffer = input->Read(sizeof(uint32_t), &bytes_available); + if (static_cast(bytes_available) != sizeof(uint32_t) || !read_buffer) { + throw ParquetException("Failed to deserialize from input stream"); + } + uint32_t algorithm; + memcpy(&algorithm, read_buffer, sizeof(uint32_t)); + if (static_cast(algorithm) != BloomFilter::Algorithm::BLOCK) { + throw ParquetException("Unsupported Bloom filter algorithm"); + } + + BlockSplitBloomFilter bloom_filter; + bloom_filter.Init(input->Read(len, &bytes_available), len); + return bloom_filter; +} + +void BlockSplitBloomFilter::WriteTo(OutputStream* sink) const { + DCHECK(sink != nullptr); + + sink->Write(reinterpret_cast(&num_bytes_), sizeof(num_bytes_)); + sink->Write(reinterpret_cast(&hash_strategy_), sizeof(hash_strategy_)); + sink->Write(reinterpret_cast(&algorithm_), sizeof(algorithm_)); + sink->Write(data_->mutable_data(), num_bytes_); +} + +void BlockSplitBloomFilter::SetMask(uint32_t key, BlockMask& block_mask) const { + for (int i = 0; i < kBitsSetPerBlock; ++i) { + block_mask.item[i] = key * SALT[i]; + } + + for (int i = 0; i < kBitsSetPerBlock; ++i) { + block_mask.item[i] = block_mask.item[i] >> 27; + } + + for (int i = 0; i < kBitsSetPerBlock; ++i) { + block_mask.item[i] = UINT32_C(0x1) << block_mask.item[i]; + } +} + +bool BlockSplitBloomFilter::FindHash(uint64_t hash) const { + const uint32_t bucket_index = + static_cast((hash >> 32) & (num_bytes_ / kBytesPerFilterBlock - 1)); + uint32_t key = static_cast(hash); + uint32_t* bitset32 = reinterpret_cast(data_->mutable_data()); + + // Calculate mask for bucket. + BlockMask block_mask; + SetMask(key, block_mask); + + for (int i = 0; i < kBitsSetPerBlock; ++i) { + if (0 == (bitset32[kBitsSetPerBlock * bucket_index + i] & block_mask.item[i])) { + return false; + } + } + return true; +} + +void BlockSplitBloomFilter::InsertHash(uint64_t hash) { + const uint32_t bucket_index = + static_cast(hash >> 32) & (num_bytes_ / kBytesPerFilterBlock - 1); + uint32_t key = static_cast(hash); + uint32_t* bitset32 = reinterpret_cast(data_->mutable_data()); + + // Calculate mask for bucket. + BlockMask block_mask; + SetMask(key, block_mask); + + for (int i = 0; i < kBitsSetPerBlock; i++) { + bitset32[bucket_index * kBitsSetPerBlock + i] |= block_mask.item[i]; + } +} + +} // namespace parquet diff --git a/src/parquet/bloom_filter.h b/src/parquet/bloom_filter.h new file mode 100644 index 00000000..e39370a4 --- /dev/null +++ b/src/parquet/bloom_filter.h @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_BLOOM_FILTER_H +#define PARQUET_BLOOM_FILTER_H + +#include + +#include "parquet/exception.h" +#include "parquet/hasher.h" +#include "parquet/types.h" +#include "parquet/util/logging.h" +#include "parquet/util/memory.h" + +namespace parquet { +class OutputStream; + +// A Bloom filter is a compact structure to indicate whether an item is not in a set or +// probably in a set. The Bloom filter usually consists of a bit set that represents a +// set of elements, a hash strategy and a Bloom filter algorithm. +class BloomFilter { + public: + // Maximum Bloom filter size, it sets to HDFS default block size 128MB + // This value will be reconsidered when implementing Bloom filter producer. + static constexpr uint32_t kMaximumBloomFilterBytes = 128 * 1024 * 1024; + + /// Determine whether an element exist in set or not. + /// + /// @param hash the element to contain. + /// @return false if value is definitely not in set, and true means PROBABLY + /// in set. + virtual bool FindHash(uint64_t hash) const = 0; + + /// Insert element to set represented by Bloom filter bitset. + /// @param hash the hash of value to insert into Bloom filter. + virtual void InsertHash(uint64_t hash) = 0; + + /// Write this Bloom filter to an output stream. A Bloom filter structure should + /// include bitset length, hash strategy, algorithm, and bitset. + /// + /// @param sink the output stream to write + virtual void WriteTo(OutputStream* sink) const = 0; + + /// Get the number of bytes of bitset + virtual uint32_t GetBitsetSize() const = 0; + + /// Compute hash for 32 bits value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t Hash(int32_t value) const = 0; + + /// Compute hash for 64 bits value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t Hash(int64_t value) const = 0; + + /// Compute hash for float value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t Hash(float value) const = 0; + + /// Compute hash for double value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t Hash(double value) const = 0; + + /// Compute hash for Int96 value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t Hash(const Int96* value) const = 0; + + /// Compute hash for ByteArray value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t Hash(const ByteArray* value) const = 0; + + /// Compute hash for fixed byte array value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t Hash(const FLBA* value, uint32_t len) const = 0; + + virtual ~BloomFilter() {} + + protected: + // Hash strategy available for Bloom filter. + enum class HashStrategy : uint32_t { MURMUR3_X64_128 = 0 }; + + // Bloom filter algorithm. + enum class Algorithm : uint32_t { BLOCK = 0 }; +}; + +// The BlockSplitBloomFilter is implemented using block-based Bloom filters from +// Putze et al.'s "Cache-,Hash- and Space-Efficient Bloom filters". The basic idea is to +// hash the item to a tiny Bloom filter which size fit a single cache line or smaller. +// +// This implementation sets 8 bits in each tiny Bloom filter. Each tiny Bloom +// filter is 32 bytes to take advantage of 32-byte SIMD instructions. +class BlockSplitBloomFilter : public BloomFilter { + public: + /// The constructor of BlockSplitBloomFilter. It uses murmur3_x64_128 as hash function. + BlockSplitBloomFilter(); + + /// Initialize the BlockSplitBloomFilter. The range of num_bytes should be within + /// [kMinimumBloomFilterBytes, kMaximumBloomFilterBytes], it will be + /// rounded up/down to lower/upper bound if num_bytes is out of range and also + /// will be rounded up to a power of 2. + /// + /// @param num_bytes The number of bytes to store Bloom filter bitset. + void Init(uint32_t num_bytes); + + /// Initialize the BlockSplitBloomFilter. It copies the bitset as underlying + /// bitset because the given bitset may not satisfy the 32-byte alignment requirement + /// which may lead to segfault when performing SIMD instructions. It is the caller's + /// responsibility to free the bitset passed in. This is used when reconstructing + /// a Bloom filter from a parquet file. + /// + /// @param bitset The given bitset to initialize the Bloom filter. + /// @param num_bytes The number of bytes of given bitset. + void Init(const uint8_t* bitset, uint32_t num_bytes); + + // Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom filter. + static constexpr uint32_t kMinimumBloomFilterBytes = 32; + + /// Calculate optimal size according to the number of distinct values and false + /// positive probability. + /// + /// @param ndv The number of distinct values. + /// @param fpp The false positive probability. + /// @return it always return a value between kMinimumBloomFilterBytes and + /// kMaximumBloomFilterBytes, and the return value is always a power of 2 + static uint32_t OptimalNumOfBits(uint32_t ndv, double fpp) { + DCHECK(fpp > 0.0 && fpp < 1.0); + const double m = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8)); + uint32_t num_bits = static_cast(m); + + // Handle overflow. + if (m < 0 || m > kMaximumBloomFilterBytes << 3) { + num_bits = static_cast(kMaximumBloomFilterBytes << 3); + } + + // Round up to lower bound + if (num_bits < kMinimumBloomFilterBytes << 3) { + num_bits = kMinimumBloomFilterBytes << 3; + } + + // Get next power of 2 if bits is not power of 2. + if ((num_bits & (num_bits - 1)) != 0) { + num_bits = static_cast(::arrow::BitUtil::NextPower2(num_bits)); + } + + // Round down to upper bound + if (num_bits > kMaximumBloomFilterBytes << 3) { + num_bits = kMaximumBloomFilterBytes << 3; + } + + return num_bits; + } + + bool FindHash(uint64_t hash) const override; + void InsertHash(uint64_t hash) override; + void WriteTo(OutputStream* sink) const override; + uint32_t GetBitsetSize() const override { return num_bytes_; } + uint64_t Hash(int64_t value) const override { return hasher_->Hash(value); } + uint64_t Hash(float value) const override { return hasher_->Hash(value); } + uint64_t Hash(double value) const override { return hasher_->Hash(value); } + uint64_t Hash(const Int96* value) const override { return hasher_->Hash(value); } + uint64_t Hash(const ByteArray* value) const override { return hasher_->Hash(value); } + uint64_t Hash(int32_t value) const override { return hasher_->Hash(value); } + uint64_t Hash(const FLBA* value, uint32_t len) const override { + return hasher_->Hash(value, len); + } + /// Deserialize the Bloom filter from an input stream. It is used when reconstructing + /// a Bloom filter from a parquet filter. + /// + /// @param input_stream The input stream from which to construct the Bloom filter + /// @return The BlockSplitBloomFilter. + static BlockSplitBloomFilter Deserialize(InputStream* input_stream); + + private: + // Bytes in a tiny Bloom filter block. + static constexpr int kBytesPerFilterBlock = 32; + + // The number of bits to be set in each tiny Bloom filter + static constexpr int kBitsSetPerBlock = 8; + + // A mask structure used to set bits in each tiny Bloom filter. + struct BlockMask { + uint32_t item[kBitsSetPerBlock]; + }; + + // The block-based algorithm needs eight odd SALT values to calculate eight indexes + // of bit to set, one bit in each 32-bit word. + static constexpr uint32_t SALT[kBitsSetPerBlock] = { + 0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU, + 0x705495c7U, 0x2df1424bU, 0x9efc4947U, 0x5c6bfb31U}; + + /// Set bits in mask array according to input key. + /// @param key the value to calculate mask values. + /// @param mask the mask array is used to set inside a block + void SetMask(uint32_t key, BlockMask& mask) const; + + // Memory pool to allocate aligned buffer for bitset + ::arrow::MemoryPool* pool_; + + // The underlying buffer of bitset. + std::shared_ptr data_; + + // The number of bytes of Bloom filter bitset. + uint32_t num_bytes_; + + // Hash strategy used in this Bloom filter. + HashStrategy hash_strategy_; + + // Algorithm used in this Bloom filter. + Algorithm algorithm_; + + // The hash pointer points to actual hash class used. + std::unique_ptr hasher_; +}; + +} // namespace parquet + +#endif // PARQUET_BLOOM_FILTER_H diff --git a/src/parquet/hasher.h b/src/parquet/hasher.h new file mode 100644 index 00000000..dc316a03 --- /dev/null +++ b/src/parquet/hasher.h @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_HASHER_H +#define PARQUET_HASHER_H + +#include +#include "parquet/types.h" + +namespace parquet { +// Abstract class for hash +class Hasher { + public: + /// Compute hash for 32 bits value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t Hash(int32_t value) const = 0; + + /// Compute hash for 64 bits value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t Hash(int64_t value) const = 0; + + /// Compute hash for float value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t Hash(float value) const = 0; + + /// Compute hash for double value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t Hash(double value) const = 0; + + /// Compute hash for Int96 value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t Hash(const Int96* value) const = 0; + + /// Compute hash for ByteArray value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t Hash(const ByteArray* value) const = 0; + + /// Compute hash for fixed byte array value by using its plain encoding result. + /// + /// @param value the value to hash. + /// @return hash result. + virtual uint64_t Hash(const FLBA* value, uint32_t len) const = 0; + + virtual ~Hasher() = default; +}; + +} // namespace parquet + +#endif // PARQUET_HASHER_H diff --git a/src/parquet/murmur3.cc b/src/parquet/murmur3.cc new file mode 100644 index 00000000..a19436e3 --- /dev/null +++ b/src/parquet/murmur3.cc @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//----------------------------------------------------------------------------- +// MurmurHash3 was written by Austin Appleby, and is placed in the public +// domain. The author hereby disclaims copyright to this source code. + +// Note - The x86 and x64 versions do _not_ produce the same results, as the +// algorithms are optimized for their respective platforms. You can still +// compile and run any of them on any platform, but your performance with the +// non-native version will be less than optimal. + +#include "parquet/murmur3.h" + +namespace parquet { + +#if defined(_MSC_VER) + +#define FORCE_INLINE __forceinline +#define ROTL64(x, y) _rotl64(x, y) + +#else // defined(_MSC_VER) + +#define FORCE_INLINE inline __attribute__((always_inline)) +inline uint64_t rotl64(uint64_t x, int8_t r) { return (x << r) | (x >> (64 - r)); } +#define ROTL64(x, y) rotl64(x, y) + +#endif // !defined(_MSC_VER) + +#define BIG_CONSTANT(x) (x##LLU) + +//----------------------------------------------------------------------------- +// Block read - if your platform needs to do endian-swapping or can only +// handle aligned reads, do the conversion here + +FORCE_INLINE uint32_t getblock32(const uint32_t* p, int i) { return p[i]; } + +FORCE_INLINE uint64_t getblock64(const uint64_t* p, int i) { return p[i]; } + +//----------------------------------------------------------------------------- +// Finalization mix - force all bits of a hash block to avalanche + +FORCE_INLINE uint32_t fmix32(uint32_t h) { + h ^= h >> 16; + h *= 0x85ebca6b; + h ^= h >> 13; + h *= 0xc2b2ae35; + h ^= h >> 16; + + return h; +} + +//---------- + +FORCE_INLINE uint64_t fmix64(uint64_t k) { + k ^= k >> 33; + k *= BIG_CONSTANT(0xff51afd7ed558ccd); + k ^= k >> 33; + k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53); + k ^= k >> 33; + + return k; +} + +//----------------------------------------------------------------------------- + +void Hash_x64_128(const void* key, const int len, const uint32_t seed, uint64_t out[2]) { + const uint8_t* data = (const uint8_t*)key; + const int nblocks = len / 16; + + uint64_t h1 = seed; + uint64_t h2 = seed; + + const uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5); + const uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f); + + //---------- + // body + + const uint64_t* blocks = (const uint64_t*)(data); + + for (int i = 0; i < nblocks; i++) { + uint64_t k1 = getblock64(blocks, i * 2 + 0); + uint64_t k2 = getblock64(blocks, i * 2 + 1); + + k1 *= c1; + k1 = ROTL64(k1, 31); + k1 *= c2; + h1 ^= k1; + + h1 = ROTL64(h1, 27); + h1 += h2; + h1 = h1 * 5 + 0x52dce729; + + k2 *= c2; + k2 = ROTL64(k2, 33); + k2 *= c1; + h2 ^= k2; + + h2 = ROTL64(h2, 31); + h2 += h1; + h2 = h2 * 5 + 0x38495ab5; + } + + //---------- + // tail + + const uint8_t* tail = (const uint8_t*)(data + nblocks * 16); + + uint64_t k1 = 0; + uint64_t k2 = 0; + + switch (len & 15) { + case 15: + k2 ^= ((uint64_t)tail[14]) << 48; + case 14: + k2 ^= ((uint64_t)tail[13]) << 40; + case 13: + k2 ^= ((uint64_t)tail[12]) << 32; + case 12: + k2 ^= ((uint64_t)tail[11]) << 24; + case 11: + k2 ^= ((uint64_t)tail[10]) << 16; + case 10: + k2 ^= ((uint64_t)tail[9]) << 8; + case 9: + k2 ^= ((uint64_t)tail[8]) << 0; + k2 *= c2; + k2 = ROTL64(k2, 33); + k2 *= c1; + h2 ^= k2; + + case 8: + k1 ^= ((uint64_t)tail[7]) << 56; + case 7: + k1 ^= ((uint64_t)tail[6]) << 48; + case 6: + k1 ^= ((uint64_t)tail[5]) << 40; + case 5: + k1 ^= ((uint64_t)tail[4]) << 32; + case 4: + k1 ^= ((uint64_t)tail[3]) << 24; + case 3: + k1 ^= ((uint64_t)tail[2]) << 16; + case 2: + k1 ^= ((uint64_t)tail[1]) << 8; + case 1: + k1 ^= ((uint64_t)tail[0]) << 0; + k1 *= c1; + k1 = ROTL64(k1, 31); + k1 *= c2; + h1 ^= k1; + } + + //---------- + // finalization + + h1 ^= len; + h2 ^= len; + + h1 += h2; + h2 += h1; + + h1 = fmix64(h1); + h2 = fmix64(h2); + + h1 += h2; + h2 += h1; + + reinterpret_cast(out)[0] = h1; + reinterpret_cast(out)[1] = h2; +} + +template +uint64_t HashHelper(T value, uint32_t seed) { + uint64_t output[2]; + Hash_x64_128(reinterpret_cast(&value), sizeof(T), seed, output); + return output[0]; +} + +uint64_t MurmurHash3::Hash(int32_t value) const { return HashHelper(value, seed_); } + +uint64_t MurmurHash3::Hash(int64_t value) const { return HashHelper(value, seed_); } + +uint64_t MurmurHash3::Hash(float value) const { return HashHelper(value, seed_); } + +uint64_t MurmurHash3::Hash(double value) const { return HashHelper(value, seed_); } + +uint64_t MurmurHash3::Hash(const FLBA* value, uint32_t len) const { + uint64_t out[2]; + Hash_x64_128(reinterpret_cast(value->ptr), len, seed_, out); + return out[0]; +} + +uint64_t MurmurHash3::Hash(const Int96* value) const { + uint64_t out[2]; + Hash_x64_128(reinterpret_cast(value->value), sizeof(value->value), seed_, + out); + return out[0]; +} + +uint64_t MurmurHash3::Hash(const ByteArray* value) const { + uint64_t out[2]; + Hash_x64_128(reinterpret_cast(value->ptr), value->len, seed_, out); + return out[0]; +} + +} // namespace parquet diff --git a/src/parquet/murmur3.h b/src/parquet/murmur3.h new file mode 100644 index 00000000..84792f36 --- /dev/null +++ b/src/parquet/murmur3.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//----------------------------------------------------------------------------- +// MurmurHash3 was written by Austin Appleby, and is placed in the public +// domain. The author hereby disclaims copyright to this source code. + +#ifndef PARQUET_MURMURHASH3_H_ +#define PARQUET_MURMURHASH3_H_ + +#include + +#include "parquet/hasher.h" +#include "parquet/types.h" + +namespace parquet { + +/// Source: +/// https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp +/// (Modified to adapt to coding conventions and to inherit the Hasher abstract class) +class MurmurHash3 : public Hasher { + public: + MurmurHash3() : seed_(DEFAULT_SEED) {} + uint64_t Hash(int32_t value) const override; + uint64_t Hash(int64_t value) const override; + uint64_t Hash(float value) const override; + uint64_t Hash(double value) const override; + uint64_t Hash(const Int96* value) const override; + uint64_t Hash(const ByteArray* value) const override; + uint64_t Hash(const FLBA* val, uint32_t len) const override; + + private: + // Default seed for hash which comes from Bloom filter in parquet-mr, it is generated + // by System.nanoTime() of java. + static constexpr int DEFAULT_SEED = 1361930890; + + uint32_t seed_; +}; + +} // namespace parquet + +#endif // PARQUET_MURMURHASH3_H_