Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -674,13 +674,15 @@ set(LIBPARQUET_SRCS
src/parquet/arrow/record_reader.cc
src/parquet/arrow/schema.cc
src/parquet/arrow/writer.cc
src/parquet/bloom_filter.cc
src/parquet/column_reader.cc
src/parquet/column_scanner.cc
src/parquet/column_writer.cc
src/parquet/exception.cc
src/parquet/file_reader.cc
src/parquet/file_writer.cc
src/parquet/metadata.cc
src/parquet/murmur3.cc
src/parquet/parquet_constants.cpp
src/parquet/parquet_types.cpp
src/parquet/printer.cc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this binary data file and can it be generated rather than checked into the repo?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The binary file is used to ensure cross compatibility between parquet-mr and parquet-cpp.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should store binary files someplace else, like in a parquet-compatibility repo. I can request ASF Infra to create such a repository if you think that's a good idea

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I agree that's a good idea. Other parquet implementation can benefit from that as well. How long it will take to create such a repo and any dependency? I have no experience on this.

Expand Down
Binary file added data/bloom_filter.bin
Binary file not shown.
4 changes: 4 additions & 0 deletions src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

# Headers: top level
install(FILES
bloom_filter.h
column_reader.h
column_page.h
column_scanner.h
Expand All @@ -25,7 +26,9 @@ install(FILES
exception.h
file_reader.h
file_writer.h
hasher.h
metadata.h
murmur3.h
printer.h
properties.h
schema.h
Expand All @@ -50,6 +53,7 @@ install(FILES
"${CMAKE_CURRENT_BINARY_DIR}/parquet.pc"
DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")

ADD_PARQUET_TEST(bloom_filter-test)
ADD_PARQUET_TEST(column_reader-test)
ADD_PARQUET_TEST(column_scanner-test)
ADD_PARQUET_TEST(column_writer-test)
Expand Down
10 changes: 10 additions & 0 deletions src/parquet/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
The CompatibilityTest of bloom_filter-test.cc is used to test cross compatibility of
Bloom filters between parquet-mr and parquet-cpp. It reads the Bloom filter binary
generated by the Bloom filter class in the parquet-mr project and tests whether the
values inserted before could be filtered or not.

The Bloom filter binary is generated by three steps from Parquet-mr:
Step 1: Construct a Bloom filter with 1024 bytes of bitset.
Step 2: Insert hashes of "hello", "parquet", "bloom", "filter" strings to Bloom filter
by calling hash and insert APIs.
Step 3: Call writeTo API to write to File.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is the right place for this. Put this comment in bloom_filter-test.cc?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have this doc in bloom_filter-test.cc.

241 changes: 241 additions & 0 deletions src/parquet/bloom_filter-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include <algorithm>
#include <random>
#include <string>

#include "arrow/io/file.h"
#include "parquet/bloom_filter.h"
#include "parquet/murmur3.h"
#include "parquet/util/memory.h"
#include "parquet/util/test-common.h"

namespace parquet {
namespace test {
TEST(Murmur3Test, TestBloomFilter) {
uint64_t result;
const uint8_t bitset[8] = {0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7};
ByteArray byteArray(8, bitset);
MurmurHash3 murmur3;
result = murmur3.Hash(&byteArray);
EXPECT_EQ(result, UINT64_C(913737700387071329));
}

TEST(ConstructorTest, TestBloomFilter) {
BlockSplitBloomFilter bloom_filter;
EXPECT_NO_THROW(bloom_filter.Init(1000));

// It throws because the length cannot be zero
std::unique_ptr<uint8_t[]> bitset1(new uint8_t[1024]());
EXPECT_THROW(bloom_filter.Init(bitset1.get(), 0), ParquetException);

// It throws because the number of bytes of Bloom filter bitset must be a power of 2.
std::unique_ptr<uint8_t[]> bitset2(new uint8_t[1024]());
EXPECT_THROW(bloom_filter.Init(bitset2.get(), 1023), ParquetException);
}

// The BasicTest is used to test basic operations including InsertHash, FindHash and
// serializing and de-serializing.
TEST(BasicTest, TestBloomFilter) {
BlockSplitBloomFilter bloom_filter;
bloom_filter.Init(1024);

for (int i = 0; i < 10; i++) {
bloom_filter.InsertHash(bloom_filter.Hash(i));
}

for (int i = 0; i < 10; i++) {
EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(i)));
}

// Serialize Bloom filter to memory output stream
InMemoryOutputStream sink;
bloom_filter.WriteTo(&sink);

// Deserialize Bloom filter from memory

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deserialization routine is used below in CompatabilityFilter. Can they share a helper function, please?

I believe this was discussed in the last code review. When code review comments are not responded to, it is hard to know if the author disagrees with them or thinks they are in the most recent version of the patch.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I may misunderstand this. I asked can we only perform Serde and find/insert test only in this test, and you agreed. So I removed Serde test in compatibility test and skipped creating help function.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And that's fine, but now that you are deserializing twice, please make a helper function.

InMemoryInputStream source(sink.GetBuffer());

BlockSplitBloomFilter de_bloom = std::move(BlockSplitBloomFilter::Deserialize(&source));

for (int i = 0; i < 10; i++) {
EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(i)));
}
}

// Helper function to generate random string.
std::string GetRandomString(uint32_t length) {
// Character set used to generate random string
const std::string charset =
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";

// The uuid_seed was generated by "uuidgen -r"
const std::string uuid_seed = "8de406aa-fb59-4195-a81c-5152af26433f";
std::seed_seq seed(uuid_seed.begin(), uuid_seed.end());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be const? Please add const to local variables when possible.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't be const.

std::mt19937 generator(seed);
std::uniform_int_distribution<uint32_t> dist(0, static_cast<int>(charset.size() - 1));
std::string ret = "";

for (uint32_t i = 0; i < length; i++) {
ret += charset[dist(generator)];
}

return ret;
}

TEST(FPPTest, TestBloomFilter) {
// It counts the number of times FindHash returns true.
int exist = 0;

// Total count of elements that will be used
const int total_count = 100000;

// Bloom filter fpp parameter
const double fpp = 0.01;

std::vector<std::string> members;
BlockSplitBloomFilter bloom_filter;
bloom_filter.Init(BlockSplitBloomFilter::OptimalNumOfBits(total_count, fpp));

// Insert elements into the Bloom filter
for (int i = 0; i < total_count; i++) {
// Insert random string which length is 8
std::string tmp = GetRandomString(8);
const ByteArray byte_array(8, reinterpret_cast<const uint8_t*>(tmp.c_str()));
members.push_back(tmp);
bloom_filter.InsertHash(bloom_filter.Hash(&byte_array));
}

for (int i = 0; i < total_count; i++) {
const ByteArray byte_array1(8, reinterpret_cast<const uint8_t*>(members[i].c_str()));
ASSERT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(&byte_array1)));
std::string tmp = GetRandomString(7);
const ByteArray byte_array2(7, reinterpret_cast<const uint8_t*>(tmp.c_str()));

if (bloom_filter.FindHash(bloom_filter.Hash(&byte_array2))) {
exist++;
}
}

// The exist should be probably less than 1000 according default FPP 0.01.
EXPECT_TRUE(exist < total_count * fpp);
}

// The CompatibilityTest is used to test cross compatibility with parquet-mr, it reads
// the Bloom filter binary generated by the Bloom filter class in the parquet-mr project
// and tests whether the values inserted before could be filtered or not.

// The Bloom filter binary is generated by three steps in from Parquet-mr.
// Step 1: Construct a Bloom filter with 1024 bytes bitset.
// Step 2: Insert "hello", "parquet", "bloom", "filter" to Bloom filter.
// Step 3: Call writeTo API to write to File.
TEST(CompatibilityTest, TestBloomFilter) {
const std::string test_string[4] = {"hello", "parquet", "bloom", "filter"};
const std::string bloom_filter_test_binary =
std::string(test::get_data_dir()) + "/bloom_filter.bin";
std::shared_ptr<::arrow::io::ReadableFile> handle;

int64_t size;
PARQUET_THROW_NOT_OK(
::arrow::io::ReadableFile::Open(bloom_filter_test_binary, &handle));
PARQUET_THROW_NOT_OK(handle->GetSize(&size));

// 1024 bytes (bitset) + 4 bytes (hash) + 4 bytes (algorithm) + 4 bytes (length)
EXPECT_EQ(size, 1036);

std::unique_ptr<uint8_t[]> bitset(new uint8_t[size]());
std::shared_ptr<Buffer> buffer(new Buffer(bitset.get(), size));
handle->Read(size, &buffer);

InMemoryInputStream source(buffer);
BlockSplitBloomFilter bloom_filter1 =
std::move(BlockSplitBloomFilter::Deserialize(&source));

for (int i = 0; i < 4; i++) {
const ByteArray tmp(static_cast<uint32_t>(test_string[i].length()),
reinterpret_cast<const uint8_t*>(test_string[i].c_str()));
EXPECT_TRUE(bloom_filter1.FindHash(bloom_filter1.Hash(&tmp)));
}

// The following is used to check whether the new created Bloom filter in parquet-cpp is
// byte-for-byte identical to file at bloom_data_path which is created from parquet-mr
// with same inserted hashes.
BlockSplitBloomFilter bloom_filter2;
bloom_filter2.Init(bloom_filter1.GetBitsetSize());
for (int i = 0; i < 4; i++) {
const ByteArray byte_array(static_cast<uint32_t>(test_string[i].length()),
reinterpret_cast<const uint8_t*>(test_string[i].c_str()));
bloom_filter2.InsertHash(bloom_filter2.Hash(&byte_array));
}

// Serialize Bloom filter to memory output stream
InMemoryOutputStream sink;
bloom_filter2.WriteTo(&sink);
std::shared_ptr<Buffer> buffer1 = sink.GetBuffer();

handle->Seek(0);
handle->GetSize(&size);
std::shared_ptr<Buffer> buffer2;
handle->Read(size, &buffer2);

EXPECT_TRUE((*buffer1).Equals(*buffer2));
}

// OptmialValueTest is used to test whether OptimalNumOfBits returns expected
// numbers according to formula:
// num_of_bits = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8.0))
// where ndv is the number of distinct values and fpp is the false positive probability.
// Also it is used to test whether OptimalNumOfBits returns value between
// [MINIMUM_BLOOM_FILTER_SIZE, MAXIMUM_BLOOM_FILTER_SIZE].
TEST(OptimalValueTest, TestBloomFilter) {
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(256, 0.01), UINT32_C(4096));
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(512, 0.01), UINT32_C(8192));
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1024, 0.01), UINT32_C(16384));
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(2048, 0.01), UINT32_C(32768));

EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.01), UINT32_C(2048));
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.01), UINT32_C(4096));
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.01), UINT32_C(8192));
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.01), UINT32_C(16384));

EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.025), UINT32_C(2048));
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.025), UINT32_C(4096));
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.025), UINT32_C(8192));
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.025), UINT32_C(16384));

EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.05), UINT32_C(2048));
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.05), UINT32_C(4096));
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.05), UINT32_C(8192));
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.05), UINT32_C(16384));

// Boundary check
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(4, 0.01), UINT32_C(256));
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(4, 0.25), UINT32_C(256));

EXPECT_EQ(
BlockSplitBloomFilter::OptimalNumOfBits(std::numeric_limits<uint32_t>::max(), 0.01),
UINT32_C(1073741824));
EXPECT_EQ(
BlockSplitBloomFilter::OptimalNumOfBits(std::numeric_limits<uint32_t>::max(), 0.25),
UINT32_C(1073741824));
}

} // namespace test

} // namespace parquet
Loading