Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion velox/docs/functions/iceberg/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,15 @@ Refer to `Iceberg documenation <https://iceberg.apache.org/spec/#partition-trans

.. iceberg:function:: bucket(numBuckets, input) -> integer

Not implemented.
Returns an integer between 0 and numBuckets - 1, indicating the assigned bucket.
Bucket partitioning is based on a 32-bit hash of the input, specifically using the x86
variant of the Murmur3 hash function with a seed of 0.

The function can be expressed in pseudo-code as below. ::

def bucket(numBuckets, input)= (murmur3_x86_32_hash(input) & Integer.MAX_VALUE) % numBuckets

The ``numBuckets`` is of type INTEGER and must be greater than 0. Otherwise, an exception is thrown.
Supported types for ``input`` are INTEGER, BIGINT, DECIMAL, DATE, TIMESTAMP, VARCHAR, VARBINARY. ::
SELECT bucket(128, 'abcd'); -- 4
SELECT bucket(100, 34L); -- 79
110 changes: 110 additions & 0 deletions velox/functions/iceberg/BucketFunction.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/functions/iceberg/BucketFunction.h"
#include "velox/functions/Macros.h"
#include "velox/functions/Registerer.h"
#include "velox/functions/iceberg/Murmur3Hash32.h"

namespace facebook::velox::functions::iceberg {
namespace {

FOLLY_ALWAYS_INLINE int32_t
getBucketIndex(int32_t numBuckets, int32_t hashedValue) {
return (hashedValue & std::numeric_limits<int32_t>::max()) % numBuckets;
}

// bucket(numBuckets, decimal) -> bucketIndex
// Hash the minimal bytes of the decimal unscaled value, then MOD numBuckets to
// get the bucket index.
template <typename TExec>
struct BucketDecimalFunction {
VELOX_DEFINE_FUNCTION_TYPES(TExec);

template <typename T>
FOLLY_ALWAYS_INLINE Status call(int32_t& out, int32_t numBuckets, T input) {
VELOX_USER_RETURN_LE(numBuckets, 0, "Invalid number of buckets.");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return Status::UserError("Invalid number of buckets");

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new API VELOX_USER_RETURN_LE, it returns UserError inside the API

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

char bytes[sizeof(int128_t)];
const auto length = DecimalUtil::toByteArray(input, bytes);
const auto hash = Murmur3Hash32::hashBytes(bytes, length);
out = getBucketIndex(numBuckets, hash);
return Status::OK();
}
};

// bucket(numBuckets, input) -> bucketIndex
// Bucket all other Iceberg supported types including the integral type,
// varchar, varbinary, date and timestamp type.
template <typename TExec>
struct BucketFunction {
VELOX_DEFINE_FUNCTION_TYPES(TExec);

// Value of type INTEGER and BIGINT is treated as unsigned type.
// For the schema evolution, may promote int to int64, treat int32 as uint64.
template <typename T>
FOLLY_ALWAYS_INLINE Status call(int32_t& out, int32_t numBuckets, T input) {
VELOX_USER_RETURN_LE(numBuckets, 0, "Invalid number of buckets.");
const auto hash = Murmur3Hash32::hashInt64(input);
out = getBucketIndex(numBuckets, hash);
return Status::OK();
}

FOLLY_ALWAYS_INLINE Status
call(int32_t& out, int32_t numBuckets, const arg_type<Varchar>& input) {
VELOX_USER_RETURN_LE(numBuckets, 0, "Invalid number of buckets.");
const auto hash = Murmur3Hash32::hashBytes(input.data(), input.size());
out = getBucketIndex(numBuckets, hash);
return Status::OK();
}

FOLLY_ALWAYS_INLINE Status
call(int32_t& out, int32_t numBuckets, const arg_type<Timestamp>& input) {
VELOX_USER_RETURN_LE(numBuckets, 0, "Invalid number of buckets.");
const auto hash = Murmur3Hash32::hashInt64(input.toMicros());
out = getBucketIndex(numBuckets, hash);
return Status::OK();
}
};

} // namespace

void registerBucketFunctions(const std::string& prefix) {
registerFunction<BucketFunction, int32_t, int32_t, int32_t>(
{prefix + "bucket"});
registerFunction<BucketFunction, int32_t, int32_t, int64_t>(
{prefix + "bucket"});
registerFunction<BucketFunction, int32_t, int32_t, Varchar>(
{prefix + "bucket"});
registerFunction<BucketFunction, int32_t, int32_t, Varbinary>(
{prefix + "bucket"});
registerFunction<BucketFunction, int32_t, int32_t, Date>({prefix + "bucket"});
registerFunction<BucketFunction, int32_t, int32_t, Timestamp>(
{prefix + "bucket"});

registerFunction<
BucketDecimalFunction,
int32_t,
int32_t,
LongDecimal<P1, S1>>({prefix + "bucket"});

registerFunction<
BucketDecimalFunction,
int32_t,
int32_t,
ShortDecimal<P1, S1>>({prefix + "bucket"});
}

} // namespace facebook::velox::functions::iceberg
24 changes: 24 additions & 0 deletions velox/functions/iceberg/BucketFunction.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <string>

namespace facebook::velox::functions::iceberg {

void registerBucketFunctions(const std::string& prefix);

} // namespace facebook::velox::functions::iceberg
13 changes: 13 additions & 0 deletions velox/functions/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ velox_add_library(velox_functions_iceberg_hash Murmur3Hash32.cpp)

velox_link_libraries(velox_functions_iceberg_hash velox_common_base velox_functions_util)

velox_add_library(velox_functions_iceberg BucketFunction.cpp Register.cpp)

velox_link_libraries(
velox_functions_iceberg
velox_functions_iceberg_hash
velox_functions_lib
Folly::folly
)

if(NOT VELOX_MONO_LIBRARY)
set_property(TARGET velox_functions_iceberg PROPERTY JOB_POOL_COMPILE high_memory_pool)
endif()

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()
26 changes: 26 additions & 0 deletions velox/functions/iceberg/Register.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/functions/iceberg/Register.h"
#include "velox/functions/iceberg/BucketFunction.h"

namespace facebook::velox::functions::iceberg {

void registerFunctions(const std::string& prefix) {
registerBucketFunctions(prefix);
}

} // namespace facebook::velox::functions::iceberg
24 changes: 24 additions & 0 deletions velox/functions/iceberg/Register.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <string>

namespace facebook::velox::functions::iceberg {

void registerFunctions(const std::string& prefix = "");

} // namespace facebook::velox::functions::iceberg
126 changes: 126 additions & 0 deletions velox/functions/iceberg/tests/BucketFunctionTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/functions/iceberg/BucketFunction.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/functions/iceberg/tests/IcebergFunctionBaseTest.h"

#include <gtest/gtest.h>

using namespace facebook::velox;
using namespace facebook::velox::exec;

namespace facebook::velox::functions::iceberg {
namespace {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests go into anonymous namespace inside the namespace that holds code being tested

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh Would you address this comment?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I misunderstand before, updated now


class BucketFunctionTest
: public functions::iceberg::test::IcebergFunctionBaseTest {
public:
BucketFunctionTest() {
options_.parseDecimalAsDouble = false;
}

protected:
template <typename T>
std::optional<int32_t> bucket(
std::optional<int32_t> numBuckets,
std::optional<T> value) {
return evaluateOnce<int32_t>("bucket(c0, c1)", numBuckets, value);
}

template <typename T>
std::optional<int32_t> bucket(
const TypePtr& type,
std::optional<int32_t> numBuckets,
std::optional<T> value) {
return evaluateOnce<int32_t>(
"bucket(c0, c1)", {INTEGER(), type}, numBuckets, value);
}
};

TEST_F(BucketFunctionTest, integerTypes) {
EXPECT_EQ(bucket<int32_t>(10, 8), 3);
EXPECT_EQ(bucket<int32_t>(10, 42), 6);
EXPECT_EQ(bucket<int32_t>(100, 34), 79);
EXPECT_EQ(bucket<int32_t>(1000, INT_MAX), 606);
EXPECT_EQ(bucket<int32_t>(1000, INT_MIN), 856);
EXPECT_EQ(bucket<int64_t>(10, 8), 3);
EXPECT_EQ(bucket<int64_t>(10, 42), 6);
EXPECT_EQ(bucket<int64_t>(100, -34), 97);
EXPECT_EQ(bucket<int64_t>(2, -1), 0);
VELOX_ASSERT_THROW(
bucket<int64_t>(0, 34),
"Reason: (0 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n");
VELOX_ASSERT_THROW(
bucket<int64_t>(-3, 34),
"Reason: (-3 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n");
}

TEST_F(BucketFunctionTest, string) {
EXPECT_EQ(bucket<std::string>(5, "abcdefg"), 4);
EXPECT_EQ(bucket<std::string>(128, "abc"), 122);
EXPECT_EQ(bucket<std::string>(128, "abcd"), 106);
EXPECT_EQ(bucket<std::string>(64, "abcde"), 54);
EXPECT_EQ(bucket<std::string>(12, "测试"), 8);
EXPECT_EQ(bucket<std::string>(16, "测试raul试测"), 1);
EXPECT_EQ(bucket<std::string>(16, ""), 0);
EXPECT_EQ(bucket<std::string>(16, "Товары"), 10);
EXPECT_EQ(bucket<std::string>(120, "😀"), 58);
VELOX_ASSERT_THROW(
bucket<std::string>(0, "abc"),
"Reason: (0 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n");
VELOX_ASSERT_THROW(
bucket<std::string>(-3, "abc"),
"Reason: (-3 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n");
}

TEST_F(BucketFunctionTest, binary) {
EXPECT_EQ(bucket<StringView>(VARBINARY(), 128, "abc"), 122);
EXPECT_EQ(bucket<StringView>(VARBINARY(), 5, "abcdefg"), 4);
}

TEST_F(BucketFunctionTest, timestamp) {
EXPECT_EQ(bucket<Timestamp>(20, Timestamp(1633046400, 0)), 17);
EXPECT_EQ(bucket<Timestamp>(20, Timestamp(0, 123456789)), 0);
EXPECT_EQ(bucket<Timestamp>(20, Timestamp(-62167219200, 0)), 5);
VELOX_ASSERT_THROW(
bucket<Timestamp>(0, Timestamp(-62167219200, 0)),
"Reason: (0 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n");
VELOX_ASSERT_THROW(
bucket<Timestamp>(-3, Timestamp(-62167219200, 0)),
"Reason: (-3 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n");
}

TEST_F(BucketFunctionTest, date) {
EXPECT_EQ(bucket<int32_t>(DATE(), 10, 8), 3);
EXPECT_EQ(bucket<int32_t>(DATE(), 10, 42), 6);
}

TEST_F(BucketFunctionTest, decimal) {
EXPECT_EQ(bucket<int64_t>(DECIMAL(9, 2), 64, 1234), 56);
EXPECT_EQ(bucket<int64_t>(DECIMAL(9, 2), 18, 1230), 13);
EXPECT_EQ(bucket<int64_t>(DECIMAL(9, 2), 16, 12999), 2);
EXPECT_EQ(bucket<int64_t>(DECIMAL(9, 2), 128, 5), 85);
EXPECT_EQ(bucket<int64_t>(DECIMAL(9, 2), 18, 5), 3);

EXPECT_EQ(bucket<int128_t>(DECIMAL(38, 2), 10, 12), 7);
EXPECT_EQ(bucket<int128_t>(DECIMAL(38, 2), 10, 0), 7);
EXPECT_EQ(bucket<int128_t>(DECIMAL(38, 2), 10, 234), 6);
EXPECT_EQ(
bucket<int128_t>(DECIMAL(38, 2), 10, DecimalUtil::kLongDecimalMax), 4);
}

} // namespace
} // namespace facebook::velox::functions::iceberg
8 changes: 5 additions & 3 deletions velox/functions/iceberg/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
add_executable(velox_functions_iceberg_test Murmur3Hash32Test.cpp)
add_executable(velox_functions_iceberg_test BucketFunctionTest.cpp Murmur3Hash32Test.cpp)

add_test(
NAME velox_functions_iceberg_test
Expand All @@ -21,8 +21,10 @@ add_test(

target_link_libraries(
velox_functions_iceberg_test
velox_functions_iceberg_hash
velox_functions_iceberg
velox_functions_test_lib
velox_expression
velox_memory
GTest::gtest
GTest::gtest_main
GTest::gmock
)
Loading
Loading