Skip to content

Commit 3f7a211

Browse files
feat(iceberg): Add bucket function (#13174)
Summary: The implementation aligns with https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/transforms/Bucket.java And described in Iceberg document https://iceberg.apache.org/spec/#partition-transforms Resolves: #13980 Pull Request resolved: #13174 Reviewed By: kKPulla Differential Revision: D81621760 Pulled By: Yuhta fbshipit-source-id: a8051cbb2676a8db0fef95e41c5858004941b7ce
1 parent 2599eef commit 3f7a211

File tree

9 files changed

+373
-4
lines changed

9 files changed

+373
-4
lines changed

velox/docs/functions/iceberg/functions.rst

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,15 @@ Refer to `Iceberg documenation <https://iceberg.apache.org/spec/#partition-trans
1010

1111
.. iceberg:function:: bucket(numBuckets, input) -> integer
1212
13-
Not implemented.
13+
Returns an integer between 0 and numBuckets - 1, indicating the assigned bucket.
14+
Bucket partitioning is based on a 32-bit hash of the input, specifically using the x86
15+
variant of the Murmur3 hash function with a seed of 0.
16+
17+
The function can be expressed in pseudo-code as below. ::
18+
19+
def bucket(numBuckets, input)= (murmur3_x86_32_hash(input) & Integer.MAX_VALUE) % numBuckets
20+
21+
The ``numBuckets`` is of type INTEGER and must be greater than 0. Otherwise, an exception is thrown.
22+
Supported types for ``input`` are INTEGER, BIGINT, DECIMAL, DATE, TIMESTAMP, VARCHAR, VARBINARY. ::
23+
SELECT bucket(128, 'abcd'); -- 4
24+
SELECT bucket(100, 34L); -- 79
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/functions/iceberg/BucketFunction.h"
18+
#include "velox/functions/Macros.h"
19+
#include "velox/functions/Registerer.h"
20+
#include "velox/functions/iceberg/Murmur3Hash32.h"
21+
22+
namespace facebook::velox::functions::iceberg {
23+
namespace {
24+
25+
FOLLY_ALWAYS_INLINE int32_t
26+
getBucketIndex(int32_t numBuckets, int32_t hashedValue) {
27+
return (hashedValue & std::numeric_limits<int32_t>::max()) % numBuckets;
28+
}
29+
30+
// bucket(numBuckets, decimal) -> bucketIndex
31+
// Hash the minimal bytes of the decimal unscaled value, then MOD numBuckets to
32+
// get the bucket index.
33+
template <typename TExec>
34+
struct BucketDecimalFunction {
35+
VELOX_DEFINE_FUNCTION_TYPES(TExec);
36+
37+
template <typename T>
38+
FOLLY_ALWAYS_INLINE Status call(int32_t& out, int32_t numBuckets, T input) {
39+
VELOX_USER_RETURN_LE(numBuckets, 0, "Invalid number of buckets.");
40+
char bytes[sizeof(int128_t)];
41+
const auto length = DecimalUtil::toByteArray(input, bytes);
42+
const auto hash = Murmur3Hash32::hashBytes(bytes, length);
43+
out = getBucketIndex(numBuckets, hash);
44+
return Status::OK();
45+
}
46+
};
47+
48+
// bucket(numBuckets, input) -> bucketIndex
49+
// Bucket all other Iceberg supported types including the integral type,
50+
// varchar, varbinary, date and timestamp type.
51+
template <typename TExec>
52+
struct BucketFunction {
53+
VELOX_DEFINE_FUNCTION_TYPES(TExec);
54+
55+
// Value of type INTEGER and BIGINT is treated as unsigned type.
56+
// For the schema evolution, may promote int to int64, treat int32 as uint64.
57+
template <typename T>
58+
FOLLY_ALWAYS_INLINE Status call(int32_t& out, int32_t numBuckets, T input) {
59+
VELOX_USER_RETURN_LE(numBuckets, 0, "Invalid number of buckets.");
60+
const auto hash = Murmur3Hash32::hashInt64(input);
61+
out = getBucketIndex(numBuckets, hash);
62+
return Status::OK();
63+
}
64+
65+
FOLLY_ALWAYS_INLINE Status
66+
call(int32_t& out, int32_t numBuckets, const arg_type<Varchar>& input) {
67+
VELOX_USER_RETURN_LE(numBuckets, 0, "Invalid number of buckets.");
68+
const auto hash = Murmur3Hash32::hashBytes(input.data(), input.size());
69+
out = getBucketIndex(numBuckets, hash);
70+
return Status::OK();
71+
}
72+
73+
FOLLY_ALWAYS_INLINE Status
74+
call(int32_t& out, int32_t numBuckets, const arg_type<Timestamp>& input) {
75+
VELOX_USER_RETURN_LE(numBuckets, 0, "Invalid number of buckets.");
76+
const auto hash = Murmur3Hash32::hashInt64(input.toMicros());
77+
out = getBucketIndex(numBuckets, hash);
78+
return Status::OK();
79+
}
80+
};
81+
82+
} // namespace
83+
84+
void registerBucketFunctions(const std::string& prefix) {
85+
registerFunction<BucketFunction, int32_t, int32_t, int32_t>(
86+
{prefix + "bucket"});
87+
registerFunction<BucketFunction, int32_t, int32_t, int64_t>(
88+
{prefix + "bucket"});
89+
registerFunction<BucketFunction, int32_t, int32_t, Varchar>(
90+
{prefix + "bucket"});
91+
registerFunction<BucketFunction, int32_t, int32_t, Varbinary>(
92+
{prefix + "bucket"});
93+
registerFunction<BucketFunction, int32_t, int32_t, Date>({prefix + "bucket"});
94+
registerFunction<BucketFunction, int32_t, int32_t, Timestamp>(
95+
{prefix + "bucket"});
96+
97+
registerFunction<
98+
BucketDecimalFunction,
99+
int32_t,
100+
int32_t,
101+
LongDecimal<P1, S1>>({prefix + "bucket"});
102+
103+
registerFunction<
104+
BucketDecimalFunction,
105+
int32_t,
106+
int32_t,
107+
ShortDecimal<P1, S1>>({prefix + "bucket"});
108+
}
109+
110+
} // namespace facebook::velox::functions::iceberg
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include <string>
19+
20+
namespace facebook::velox::functions::iceberg {
21+
22+
void registerBucketFunctions(const std::string& prefix);
23+
24+
} // namespace facebook::velox::functions::iceberg

velox/functions/iceberg/CMakeLists.txt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,19 @@ velox_add_library(velox_functions_iceberg_hash Murmur3Hash32.cpp)
1616

1717
velox_link_libraries(velox_functions_iceberg_hash velox_common_base velox_functions_util)
1818

19+
velox_add_library(velox_functions_iceberg BucketFunction.cpp Register.cpp)
20+
21+
velox_link_libraries(
22+
velox_functions_iceberg
23+
velox_functions_iceberg_hash
24+
velox_functions_lib
25+
Folly::folly
26+
)
27+
28+
if(NOT VELOX_MONO_LIBRARY)
29+
set_property(TARGET velox_functions_iceberg PROPERTY JOB_POOL_COMPILE high_memory_pool)
30+
endif()
31+
1932
if(${VELOX_BUILD_TESTING})
2033
add_subdirectory(tests)
2134
endif()
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/functions/iceberg/Register.h"
18+
#include "velox/functions/iceberg/BucketFunction.h"
19+
20+
namespace facebook::velox::functions::iceberg {
21+
22+
void registerFunctions(const std::string& prefix) {
23+
registerBucketFunctions(prefix);
24+
}
25+
26+
} // namespace facebook::velox::functions::iceberg

velox/functions/iceberg/Register.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include <string>
19+
20+
namespace facebook::velox::functions::iceberg {
21+
22+
void registerFunctions(const std::string& prefix = "");
23+
24+
} // namespace facebook::velox::functions::iceberg
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#include "velox/functions/iceberg/BucketFunction.h"
17+
#include "velox/common/base/tests/GTestUtils.h"
18+
#include "velox/functions/iceberg/tests/IcebergFunctionBaseTest.h"
19+
20+
#include <gtest/gtest.h>
21+
22+
using namespace facebook::velox;
23+
using namespace facebook::velox::exec;
24+
25+
namespace facebook::velox::functions::iceberg {
26+
namespace {
27+
28+
class BucketFunctionTest
29+
: public functions::iceberg::test::IcebergFunctionBaseTest {
30+
public:
31+
BucketFunctionTest() {
32+
options_.parseDecimalAsDouble = false;
33+
}
34+
35+
protected:
36+
template <typename T>
37+
std::optional<int32_t> bucket(
38+
std::optional<int32_t> numBuckets,
39+
std::optional<T> value) {
40+
return evaluateOnce<int32_t>("bucket(c0, c1)", numBuckets, value);
41+
}
42+
43+
template <typename T>
44+
std::optional<int32_t> bucket(
45+
const TypePtr& type,
46+
std::optional<int32_t> numBuckets,
47+
std::optional<T> value) {
48+
return evaluateOnce<int32_t>(
49+
"bucket(c0, c1)", {INTEGER(), type}, numBuckets, value);
50+
}
51+
};
52+
53+
TEST_F(BucketFunctionTest, integerTypes) {
54+
EXPECT_EQ(bucket<int32_t>(10, 8), 3);
55+
EXPECT_EQ(bucket<int32_t>(10, 42), 6);
56+
EXPECT_EQ(bucket<int32_t>(100, 34), 79);
57+
EXPECT_EQ(bucket<int32_t>(1000, INT_MAX), 606);
58+
EXPECT_EQ(bucket<int32_t>(1000, INT_MIN), 856);
59+
EXPECT_EQ(bucket<int64_t>(10, 8), 3);
60+
EXPECT_EQ(bucket<int64_t>(10, 42), 6);
61+
EXPECT_EQ(bucket<int64_t>(100, -34), 97);
62+
EXPECT_EQ(bucket<int64_t>(2, -1), 0);
63+
VELOX_ASSERT_THROW(
64+
bucket<int64_t>(0, 34),
65+
"Reason: (0 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n");
66+
VELOX_ASSERT_THROW(
67+
bucket<int64_t>(-3, 34),
68+
"Reason: (-3 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n");
69+
}
70+
71+
TEST_F(BucketFunctionTest, string) {
72+
EXPECT_EQ(bucket<std::string>(5, "abcdefg"), 4);
73+
EXPECT_EQ(bucket<std::string>(128, "abc"), 122);
74+
EXPECT_EQ(bucket<std::string>(128, "abcd"), 106);
75+
EXPECT_EQ(bucket<std::string>(64, "abcde"), 54);
76+
EXPECT_EQ(bucket<std::string>(12, "测试"), 8);
77+
EXPECT_EQ(bucket<std::string>(16, "测试raul试测"), 1);
78+
EXPECT_EQ(bucket<std::string>(16, ""), 0);
79+
EXPECT_EQ(bucket<std::string>(16, "Товары"), 10);
80+
EXPECT_EQ(bucket<std::string>(120, "😀"), 58);
81+
VELOX_ASSERT_THROW(
82+
bucket<std::string>(0, "abc"),
83+
"Reason: (0 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n");
84+
VELOX_ASSERT_THROW(
85+
bucket<std::string>(-3, "abc"),
86+
"Reason: (-3 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n");
87+
}
88+
89+
TEST_F(BucketFunctionTest, binary) {
90+
EXPECT_EQ(bucket<StringView>(VARBINARY(), 128, "abc"), 122);
91+
EXPECT_EQ(bucket<StringView>(VARBINARY(), 5, "abcdefg"), 4);
92+
}
93+
94+
TEST_F(BucketFunctionTest, timestamp) {
95+
EXPECT_EQ(bucket<Timestamp>(20, Timestamp(1633046400, 0)), 17);
96+
EXPECT_EQ(bucket<Timestamp>(20, Timestamp(0, 123456789)), 0);
97+
EXPECT_EQ(bucket<Timestamp>(20, Timestamp(-62167219200, 0)), 5);
98+
VELOX_ASSERT_THROW(
99+
bucket<Timestamp>(0, Timestamp(-62167219200, 0)),
100+
"Reason: (0 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n");
101+
VELOX_ASSERT_THROW(
102+
bucket<Timestamp>(-3, Timestamp(-62167219200, 0)),
103+
"Reason: (-3 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n");
104+
}
105+
106+
TEST_F(BucketFunctionTest, date) {
107+
EXPECT_EQ(bucket<int32_t>(DATE(), 10, 8), 3);
108+
EXPECT_EQ(bucket<int32_t>(DATE(), 10, 42), 6);
109+
}
110+
111+
TEST_F(BucketFunctionTest, decimal) {
112+
EXPECT_EQ(bucket<int64_t>(DECIMAL(9, 2), 64, 1234), 56);
113+
EXPECT_EQ(bucket<int64_t>(DECIMAL(9, 2), 18, 1230), 13);
114+
EXPECT_EQ(bucket<int64_t>(DECIMAL(9, 2), 16, 12999), 2);
115+
EXPECT_EQ(bucket<int64_t>(DECIMAL(9, 2), 128, 5), 85);
116+
EXPECT_EQ(bucket<int64_t>(DECIMAL(9, 2), 18, 5), 3);
117+
118+
EXPECT_EQ(bucket<int128_t>(DECIMAL(38, 2), 10, 12), 7);
119+
EXPECT_EQ(bucket<int128_t>(DECIMAL(38, 2), 10, 0), 7);
120+
EXPECT_EQ(bucket<int128_t>(DECIMAL(38, 2), 10, 234), 6);
121+
EXPECT_EQ(
122+
bucket<int128_t>(DECIMAL(38, 2), 10, DecimalUtil::kLongDecimalMax), 4);
123+
}
124+
125+
} // namespace
126+
} // namespace facebook::velox::functions::iceberg

velox/functions/iceberg/tests/CMakeLists.txt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
add_executable(velox_functions_iceberg_test Murmur3Hash32Test.cpp)
14+
add_executable(velox_functions_iceberg_test BucketFunctionTest.cpp Murmur3Hash32Test.cpp)
1515

1616
add_test(
1717
NAME velox_functions_iceberg_test
@@ -21,8 +21,10 @@ add_test(
2121

2222
target_link_libraries(
2323
velox_functions_iceberg_test
24-
velox_functions_iceberg_hash
24+
velox_functions_iceberg
25+
velox_functions_test_lib
26+
velox_expression
27+
velox_memory
2528
GTest::gtest
2629
GTest::gtest_main
27-
GTest::gmock
2830
)

0 commit comments

Comments
 (0)