From cffeb38eb1fded0a50ef718d63ad23318368837e Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 11 Aug 2025 16:48:48 +0100 Subject: [PATCH 01/11] support iceberg bucket function --- velox/docs/functions/iceberg/functions.rst | 13 +- velox/functions/iceberg/BucketFunction.cpp | 124 ++++++++++++++++++ velox/functions/iceberg/BucketFunction.h | 24 ++++ velox/functions/iceberg/CMakeLists.txt | 10 ++ velox/functions/iceberg/Register.cpp | 26 ++++ velox/functions/iceberg/Register.h | 24 ++++ .../iceberg/tests/BucketFunctionTest.cpp | 123 +++++++++++++++++ velox/functions/iceberg/tests/CMakeLists.txt | 10 +- .../iceberg/tests/IcebergFunctionBaseTest.h | 34 +++++ 9 files changed, 385 insertions(+), 3 deletions(-) create mode 100644 velox/functions/iceberg/BucketFunction.cpp create mode 100644 velox/functions/iceberg/BucketFunction.h create mode 100644 velox/functions/iceberg/Register.cpp create mode 100644 velox/functions/iceberg/Register.h create mode 100644 velox/functions/iceberg/tests/BucketFunctionTest.cpp create mode 100644 velox/functions/iceberg/tests/IcebergFunctionBaseTest.h diff --git a/velox/docs/functions/iceberg/functions.rst b/velox/docs/functions/iceberg/functions.rst index 5673d607dbd4..464af6573276 100644 --- a/velox/docs/functions/iceberg/functions.rst +++ b/velox/docs/functions/iceberg/functions.rst @@ -10,4 +10,15 @@ Refer to `Iceberg documenation integer - Not implemented. + Returns an integer between 0 and ``numBuckets - 1`` representing the bucket assignment. + Bucket partition transforms use a 32-bit hash of the ``input``. The 32-bit hash implementation is the 32-bit Murmur3 hash, x86 variant, seeded with 0. + The hash mod ``numBuckets`` must produce a positive value by first discarding the sign bit of the hash value. + + In pseudo-code, the function is showing as following. :: + + def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N + + Argument ``numBuckets`` is of type INTEGER, the ``numBuckets`` must be more than 0, otherwise, throws. + Supported types for ``input`` are INTEGER, BIGINT, DECIMAL, DATE, TIMESTAMP, VARCHAR, VARBINARY. :: + SELECT bucket(128, 'abcd'); -- 4 + SELECT bucket(100, 34L); -- 79 diff --git a/velox/functions/iceberg/BucketFunction.cpp b/velox/functions/iceberg/BucketFunction.cpp new file mode 100644 index 000000000000..40ccb09636a2 --- /dev/null +++ b/velox/functions/iceberg/BucketFunction.cpp @@ -0,0 +1,124 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/functions/iceberg/BucketFunction.h" +#include "velox/common/base/Status.h" +#include "velox/functions/Macros.h" +#include "velox/functions/Registerer.h" +#include "velox/functions/iceberg/Murmur3Hash32.h" +#include "velox/type/Timestamp.h" + +namespace facebook::velox::functions::iceberg { +namespace { + +FOLLY_ALWAYS_INLINE int32_t getBucketIndex(int32_t numBuckets, int32_t hashedValue) { + return (hashedValue & INT_MAX) % numBuckets; +} + +// bucket(numBuckets, decimal) -> bucketIndex +// Hash the minimal bytes of the decimal unscaled value, then MOD numBuckets to +// get the bucket index. +template +struct BucketDecimalFunction { + VELOX_DEFINE_FUNCTION_TYPES(TExec); + + template + FOLLY_ALWAYS_INLINE Status call(int32_t& out, int32_t numBuckets, T input) { + VELOX_RETURN_IF( + numBuckets <= 0, + Status::UserError( + "Invalid number of buckets: {} (must be > 0)", numBuckets)); + char bytes[sizeof(int128_t)]; + const auto length = DecimalUtil::toByteArray(input, bytes); + const auto hash = Murmur3Hash32::hashBytes(bytes, length); + out = getBucketIndex(numBuckets, hash); + return Status::OK(); + } +}; + +// bucket(numBuckets, input) -> bucketIndex +// Bucket all other Iceberg supported types including the integral type, +// varchar, varbinary, date and timestamp type. +template +struct BucketFunction { + VELOX_DEFINE_FUNCTION_TYPES(TExec); + + /// Value of type INTEGER and BIGINT is treated as unsigned type. + /// For the schema evolution, may promote int to int64, treat int32 as uint64. + template + FOLLY_ALWAYS_INLINE Status call(int32_t& out, int32_t numBuckets, T input) { + VELOX_RETURN_IF( + numBuckets <= 0, + Status::UserError( + "Invalid number of buckets: {} (must be > 0)", numBuckets)); + const auto hash = Murmur3Hash32::hashInt64(input); + out = getBucketIndex(numBuckets, hash); + return Status::OK(); + } + + FOLLY_ALWAYS_INLINE Status + call(int32_t& out, int32_t numBuckets, const arg_type& input) { + VELOX_RETURN_IF( + numBuckets <= 0, + Status::UserError( + "Invalid number of buckets: {} (must be > 0)", numBuckets)); + const auto hash = Murmur3Hash32::hashBytes(input.data(), input.size()); + out = getBucketIndex(numBuckets, hash); + return Status::OK(); + } + + FOLLY_ALWAYS_INLINE Status + call(int32_t& out, int32_t numBuckets, const arg_type& input) { + VELOX_RETURN_IF( + numBuckets <= 0, + Status::UserError( + "Invalid number of buckets: {} (must be > 0)", numBuckets)); + const auto hash = Murmur3Hash32::hashInt64(input.toMicros()); + out = getBucketIndex(numBuckets, hash); + return Status::OK(); + } +}; + +} // namespace + +void registerBucketFunctions(const std::string& prefix) { + registerFunction( + {prefix + "bucket"}); + registerFunction( + {prefix + "bucket"}); + registerFunction( + {prefix + "bucket"}); + registerFunction( + {prefix + "bucket"}); + registerFunction({prefix + "bucket"}); + registerFunction( + {prefix + "bucket"}); + + + registerFunction< + BucketDecimalFunction, + int32_t, + int32_t, + LongDecimal>({prefix + "bucket"}); + + registerFunction< + BucketDecimalFunction, + int32_t, + int32_t, + ShortDecimal>({prefix + "bucket"}); +} + +} // namespace facebook::velox::functions::iceberg diff --git a/velox/functions/iceberg/BucketFunction.h b/velox/functions/iceberg/BucketFunction.h new file mode 100644 index 000000000000..2744e4e4a80f --- /dev/null +++ b/velox/functions/iceberg/BucketFunction.h @@ -0,0 +1,24 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace facebook::velox::functions::iceberg { + +void registerBucketFunctions(const std::string& prefix); + +} // namespace facebook::velox::functions::iceberg diff --git a/velox/functions/iceberg/CMakeLists.txt b/velox/functions/iceberg/CMakeLists.txt index 959cfe5eb16e..a0390c877cb8 100644 --- a/velox/functions/iceberg/CMakeLists.txt +++ b/velox/functions/iceberg/CMakeLists.txt @@ -16,6 +16,16 @@ velox_add_library(velox_functions_iceberg_hash Murmur3Hash32.cpp) velox_link_libraries(velox_functions_iceberg_hash velox_common_base velox_functions_util) +velox_add_library(velox_functions_iceberg BucketFunction.cpp Register.cpp) + +velox_link_libraries(velox_functions_iceberg velox_functions_iceberg_hash + velox_functions_lib Folly::folly) + +if(NOT VELOX_MONO_LIBRARY) + set_property(TARGET velox_functions_iceberg PROPERTY JOB_POOL_COMPILE + high_memory_pool) +endif() + if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) endif() diff --git a/velox/functions/iceberg/Register.cpp b/velox/functions/iceberg/Register.cpp new file mode 100644 index 000000000000..026389ea5cb8 --- /dev/null +++ b/velox/functions/iceberg/Register.cpp @@ -0,0 +1,26 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/functions/iceberg/Register.h" +#include "velox/functions/iceberg/BucketFunction.h" + +namespace facebook::velox::functions::iceberg { + +void registerFunctions(const std::string& prefix) { + registerBucketFunctions(prefix); +} + +} // namespace facebook::velox::functions::iceberg diff --git a/velox/functions/iceberg/Register.h b/velox/functions/iceberg/Register.h new file mode 100644 index 000000000000..117d405535e3 --- /dev/null +++ b/velox/functions/iceberg/Register.h @@ -0,0 +1,24 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace facebook::velox::functions::iceberg { + +void registerFunctions(const std::string& prefix = ""); + +} // namespace facebook::velox::functions::iceberg diff --git a/velox/functions/iceberg/tests/BucketFunctionTest.cpp b/velox/functions/iceberg/tests/BucketFunctionTest.cpp new file mode 100644 index 000000000000..24c8d9c58010 --- /dev/null +++ b/velox/functions/iceberg/tests/BucketFunctionTest.cpp @@ -0,0 +1,123 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/functions/iceberg/BucketFunction.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/functions/iceberg/tests/IcebergFunctionBaseTest.h" + +#include + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::functions::iceberg; + +namespace { + +class BucketFunctionTest + : public functions::iceberg::test::IcebergFunctionBaseTest { + public: + BucketFunctionTest() { + options_.parseDecimalAsDouble = false; + } + + protected: + template + std::optional bucket( + std::optional numBuckets, + std::optional value) { + return evaluateOnce("bucket(c0, c1)", numBuckets, value); + } + + void assertBucket( + const VectorPtr& expected, + const std::vector& input) { + auto result = + evaluate>("bucket(c0, c1)", makeRowVector(input)); + facebook::velox::test::assertEqualVectors(expected, result); + } +}; + +} // namespace + +TEST_F(BucketFunctionTest, integerTypes) { + EXPECT_EQ(bucket(10, 8), 3); + EXPECT_EQ(bucket(10, 42), 6); + EXPECT_EQ(bucket(100, 34), 79); + EXPECT_EQ(bucket(1000, INT_MAX), 606); + EXPECT_EQ(bucket(1000, INT_MIN), 856); + EXPECT_EQ(bucket(10, 8), 3); + EXPECT_EQ(bucket(10, 42), 6); + EXPECT_EQ(bucket(100, -34), 97); + EXPECT_EQ(bucket(2, -1), 0); + VELOX_ASSERT_THROW( + bucket(0, 34), "Invalid number of buckets: 0 (must be > 0)"); + VELOX_ASSERT_THROW( + bucket(-3, 34), "Invalid number of buckets: -3 (must be > 0)"); +} + +TEST_F(BucketFunctionTest, string) { + EXPECT_EQ(bucket(5, "abcdefg"), 4); + EXPECT_EQ(bucket(128, "abc"), 122); + EXPECT_EQ(bucket(128, "abcd"), 106); + EXPECT_EQ(bucket(64, "abcde"), 54); + EXPECT_EQ(bucket(12, "测试"), 8); + EXPECT_EQ(bucket(16, "测试raul试测"), 1); + EXPECT_EQ(bucket(16, ""), 0); + EXPECT_EQ(bucket(16, "Товары"), 10); + EXPECT_EQ(bucket(120, "😀"), 58); + VELOX_ASSERT_THROW( + bucket(0, "abc"), + "Invalid number of buckets: 0 (must be > 0)"); + VELOX_ASSERT_THROW( + bucket(-3, "abc"), + "Invalid number of buckets: -3 (must be > 0)"); +} + +TEST_F(BucketFunctionTest, binary) { + assertBucket( + makeFlatVector({122, 4}), + {makeFlatVector({128, 5}), + makeFlatVector({"abc", "abcdefg"}, VARBINARY())}); +} + +TEST_F(BucketFunctionTest, timestamp) { + EXPECT_EQ(bucket(20, Timestamp(1633046400, 0)), 17); + EXPECT_EQ(bucket(20, Timestamp(0, 123456789)), 0); + EXPECT_EQ(bucket(20, Timestamp(-62167219200, 0)), 5); + VELOX_ASSERT_THROW( + bucket(0, Timestamp(-62167219200, 0)), + "Invalid number of buckets: 0 (must be > 0)"); + VELOX_ASSERT_THROW( + bucket(-3, Timestamp(-62167219200, 0)), + "Invalid number of buckets: -3 (must be > 0)"); +} + +TEST_F(BucketFunctionTest, date) { + assertBucket( + makeFlatVector({3, 6}), + {makeConstant(10, 2), makeFlatVector({8, 42}, DATE())}); +} + +TEST_F(BucketFunctionTest, decimal) { + assertBucket( + makeFlatVector({56, 13, 2, 85, 3}), + {makeFlatVector({64, 18, 16, 128, 18}), + makeFlatVector({1234, 1230, 12999, 5, 5, 5}, DECIMAL(9, 2))}); + assertBucket( + makeFlatVector({7, 7, 6, 4}), + {makeConstant(10, 4), + makeFlatVector( + {12, 0, 234, DecimalUtil::kLongDecimalMax}, DECIMAL(38, 2))}); +} diff --git a/velox/functions/iceberg/tests/CMakeLists.txt b/velox/functions/iceberg/tests/CMakeLists.txt index 60e561051217..5924ae3c05a8 100644 --- a/velox/functions/iceberg/tests/CMakeLists.txt +++ b/velox/functions/iceberg/tests/CMakeLists.txt @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -add_executable(velox_functions_iceberg_test Murmur3Hash32Test.cpp) +add_executable(velox_functions_iceberg_test BucketFunctionTest.cpp Murmur3Hash32Test.cpp) add_test( NAME velox_functions_iceberg_test @@ -21,7 +21,13 @@ add_test( target_link_libraries( velox_functions_iceberg_test - velox_functions_iceberg_hash + velox_functions_iceberg + velox_functions_test_lib + velox_is_null_functions + velox_exec_test_lib + velox_expression + velox_memory + velox_dwio_common_test_utils GTest::gtest GTest::gtest_main GTest::gmock diff --git a/velox/functions/iceberg/tests/IcebergFunctionBaseTest.h b/velox/functions/iceberg/tests/IcebergFunctionBaseTest.h new file mode 100644 index 000000000000..f38501a8172f --- /dev/null +++ b/velox/functions/iceberg/tests/IcebergFunctionBaseTest.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/functions/iceberg/Register.h" +#include "velox/functions/prestosql/tests/utils/FunctionBaseTest.h" +#include "velox/parse/TypeResolver.h" + +namespace facebook::velox::functions::iceberg::test { + +class IcebergFunctionBaseTest + : public facebook::velox::functions::test::FunctionBaseTest { + protected: + static void SetUpTestCase() { + parse::registerTypeResolver(); + registerFunctions(""); + memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{}); + } +}; + +} // namespace facebook::velox::functions::iceberg::test From 44a03c1751b5e91df801ad6d96ec39870f8b2736 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 11 Aug 2025 17:56:01 +0100 Subject: [PATCH 02/11] fix code style --- velox/functions/iceberg/BucketFunction.cpp | 4 ++-- velox/functions/iceberg/tests/CMakeLists.txt | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/velox/functions/iceberg/BucketFunction.cpp b/velox/functions/iceberg/BucketFunction.cpp index 40ccb09636a2..91598b1db6a7 100644 --- a/velox/functions/iceberg/BucketFunction.cpp +++ b/velox/functions/iceberg/BucketFunction.cpp @@ -24,7 +24,8 @@ namespace facebook::velox::functions::iceberg { namespace { -FOLLY_ALWAYS_INLINE int32_t getBucketIndex(int32_t numBuckets, int32_t hashedValue) { +FOLLY_ALWAYS_INLINE int32_t +getBucketIndex(int32_t numBuckets, int32_t hashedValue) { return (hashedValue & INT_MAX) % numBuckets; } @@ -107,7 +108,6 @@ void registerBucketFunctions(const std::string& prefix) { registerFunction( {prefix + "bucket"}); - registerFunction< BucketDecimalFunction, int32_t, diff --git a/velox/functions/iceberg/tests/CMakeLists.txt b/velox/functions/iceberg/tests/CMakeLists.txt index 5924ae3c05a8..93621ab57ffa 100644 --- a/velox/functions/iceberg/tests/CMakeLists.txt +++ b/velox/functions/iceberg/tests/CMakeLists.txt @@ -11,7 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -add_executable(velox_functions_iceberg_test BucketFunctionTest.cpp Murmur3Hash32Test.cpp) +add_executable(velox_functions_iceberg_test BucketFunctionTest.cpp + Murmur3Hash32Test.cpp) add_test( NAME velox_functions_iceberg_test From 94b807cbe6e766ed1143f90df3444b0bf7cd9231 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Thu, 14 Aug 2025 15:53:39 +0100 Subject: [PATCH 03/11] address comments --- velox/docs/functions/iceberg/functions.rst | 10 +++++----- velox/functions/iceberg/BucketFunction.cpp | 6 ++---- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/velox/docs/functions/iceberg/functions.rst b/velox/docs/functions/iceberg/functions.rst index 464af6573276..622bc269650c 100644 --- a/velox/docs/functions/iceberg/functions.rst +++ b/velox/docs/functions/iceberg/functions.rst @@ -10,15 +10,15 @@ Refer to `Iceberg documenation integer - Returns an integer between 0 and ``numBuckets - 1`` representing the bucket assignment. - Bucket partition transforms use a 32-bit hash of the ``input``. The 32-bit hash implementation is the 32-bit Murmur3 hash, x86 variant, seeded with 0. - The hash mod ``numBuckets`` must produce a positive value by first discarding the sign bit of the hash value. + Returns an integer between 0 and numBuckets - 1, indicating the assigned bucket. + Bucket partitioning is based on a 32-bit hash of the input, specifically using the x86 + variant of the Murmur3 hash function with a seed of 0. In pseudo-code, the function is showing as following. :: - def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N + def bucket(numBuckets, input)= (murmur3_x86_32_hash(input) & Integer.MAX_VALUE) % numBuckets - Argument ``numBuckets`` is of type INTEGER, the ``numBuckets`` must be more than 0, otherwise, throws. + The ``numBuckets`` is of type INTEGER and must be greater than 0. Otherwise, an exception is thrown. Supported types for ``input`` are INTEGER, BIGINT, DECIMAL, DATE, TIMESTAMP, VARCHAR, VARBINARY. :: SELECT bucket(128, 'abcd'); -- 4 SELECT bucket(100, 34L); -- 79 diff --git a/velox/functions/iceberg/BucketFunction.cpp b/velox/functions/iceberg/BucketFunction.cpp index 91598b1db6a7..766dbaa993e8 100644 --- a/velox/functions/iceberg/BucketFunction.cpp +++ b/velox/functions/iceberg/BucketFunction.cpp @@ -15,11 +15,9 @@ */ #include "velox/functions/iceberg/BucketFunction.h" -#include "velox/common/base/Status.h" #include "velox/functions/Macros.h" #include "velox/functions/Registerer.h" #include "velox/functions/iceberg/Murmur3Hash32.h" -#include "velox/type/Timestamp.h" namespace facebook::velox::functions::iceberg { namespace { @@ -57,8 +55,8 @@ template struct BucketFunction { VELOX_DEFINE_FUNCTION_TYPES(TExec); - /// Value of type INTEGER and BIGINT is treated as unsigned type. - /// For the schema evolution, may promote int to int64, treat int32 as uint64. + // Value of type INTEGER and BIGINT is treated as unsigned type. + // For the schema evolution, may promote int to int64, treat int32 as uint64. template FOLLY_ALWAYS_INLINE Status call(int32_t& out, int32_t numBuckets, T input) { VELOX_RETURN_IF( From e148717192d6bf4324c2d32e8240ee176fae2b29 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 18 Aug 2025 16:56:10 +0100 Subject: [PATCH 04/11] address comments --- velox/docs/functions/iceberg/functions.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/velox/docs/functions/iceberg/functions.rst b/velox/docs/functions/iceberg/functions.rst index 622bc269650c..768f79b43a55 100644 --- a/velox/docs/functions/iceberg/functions.rst +++ b/velox/docs/functions/iceberg/functions.rst @@ -14,7 +14,7 @@ Refer to `Iceberg documenation Date: Wed, 27 Aug 2025 11:40:21 +0100 Subject: [PATCH 05/11] Use new API --- velox/functions/iceberg/BucketFunction.cpp | 20 ++++--------------- .../iceberg/tests/BucketFunctionTest.cpp | 14 +++++++------ 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/velox/functions/iceberg/BucketFunction.cpp b/velox/functions/iceberg/BucketFunction.cpp index 766dbaa993e8..d123d4c4f504 100644 --- a/velox/functions/iceberg/BucketFunction.cpp +++ b/velox/functions/iceberg/BucketFunction.cpp @@ -36,10 +36,7 @@ struct BucketDecimalFunction { template FOLLY_ALWAYS_INLINE Status call(int32_t& out, int32_t numBuckets, T input) { - VELOX_RETURN_IF( - numBuckets <= 0, - Status::UserError( - "Invalid number of buckets: {} (must be > 0)", numBuckets)); + VELOX_USER_RETURN_LE(numBuckets, 0, "Invalid number of buckets."); char bytes[sizeof(int128_t)]; const auto length = DecimalUtil::toByteArray(input, bytes); const auto hash = Murmur3Hash32::hashBytes(bytes, length); @@ -59,10 +56,7 @@ struct BucketFunction { // For the schema evolution, may promote int to int64, treat int32 as uint64. template FOLLY_ALWAYS_INLINE Status call(int32_t& out, int32_t numBuckets, T input) { - VELOX_RETURN_IF( - numBuckets <= 0, - Status::UserError( - "Invalid number of buckets: {} (must be > 0)", numBuckets)); + VELOX_USER_RETURN_LE(numBuckets, 0, "Invalid number of buckets."); const auto hash = Murmur3Hash32::hashInt64(input); out = getBucketIndex(numBuckets, hash); return Status::OK(); @@ -70,10 +64,7 @@ struct BucketFunction { FOLLY_ALWAYS_INLINE Status call(int32_t& out, int32_t numBuckets, const arg_type& input) { - VELOX_RETURN_IF( - numBuckets <= 0, - Status::UserError( - "Invalid number of buckets: {} (must be > 0)", numBuckets)); + VELOX_USER_RETURN_LE(numBuckets, 0, "Invalid number of buckets."); const auto hash = Murmur3Hash32::hashBytes(input.data(), input.size()); out = getBucketIndex(numBuckets, hash); return Status::OK(); @@ -81,10 +72,7 @@ struct BucketFunction { FOLLY_ALWAYS_INLINE Status call(int32_t& out, int32_t numBuckets, const arg_type& input) { - VELOX_RETURN_IF( - numBuckets <= 0, - Status::UserError( - "Invalid number of buckets: {} (must be > 0)", numBuckets)); + VELOX_USER_RETURN_LE(numBuckets, 0, "Invalid number of buckets."); const auto hash = Murmur3Hash32::hashInt64(input.toMicros()); out = getBucketIndex(numBuckets, hash); return Status::OK(); diff --git a/velox/functions/iceberg/tests/BucketFunctionTest.cpp b/velox/functions/iceberg/tests/BucketFunctionTest.cpp index 24c8d9c58010..500b9bec5031 100644 --- a/velox/functions/iceberg/tests/BucketFunctionTest.cpp +++ b/velox/functions/iceberg/tests/BucketFunctionTest.cpp @@ -62,9 +62,11 @@ TEST_F(BucketFunctionTest, integerTypes) { EXPECT_EQ(bucket(100, -34), 97); EXPECT_EQ(bucket(2, -1), 0); VELOX_ASSERT_THROW( - bucket(0, 34), "Invalid number of buckets: 0 (must be > 0)"); + bucket(0, 34), + "Reason: (0 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n"); VELOX_ASSERT_THROW( - bucket(-3, 34), "Invalid number of buckets: -3 (must be > 0)"); + bucket(-3, 34), + "Reason: (-3 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n"); } TEST_F(BucketFunctionTest, string) { @@ -79,10 +81,10 @@ TEST_F(BucketFunctionTest, string) { EXPECT_EQ(bucket(120, "😀"), 58); VELOX_ASSERT_THROW( bucket(0, "abc"), - "Invalid number of buckets: 0 (must be > 0)"); + "Reason: (0 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n"); VELOX_ASSERT_THROW( bucket(-3, "abc"), - "Invalid number of buckets: -3 (must be > 0)"); + "Reason: (-3 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n"); } TEST_F(BucketFunctionTest, binary) { @@ -98,10 +100,10 @@ TEST_F(BucketFunctionTest, timestamp) { EXPECT_EQ(bucket(20, Timestamp(-62167219200, 0)), 5); VELOX_ASSERT_THROW( bucket(0, Timestamp(-62167219200, 0)), - "Invalid number of buckets: 0 (must be > 0)"); + "Reason: (0 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n"); VELOX_ASSERT_THROW( bucket(-3, Timestamp(-62167219200, 0)), - "Invalid number of buckets: -3 (must be > 0)"); + "Reason: (-3 vs. 0) Invalid number of buckets.\nExpression: numBuckets <= 0\n"); } TEST_F(BucketFunctionTest, date) { From 23cd7578064eb10d13df453aa660ff44bea78c8d Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Wed, 27 Aug 2025 17:15:58 +0100 Subject: [PATCH 06/11] Use simple evaluateOnce API to test --- velox/functions/iceberg/BucketFunction.cpp | 2 +- .../iceberg/tests/BucketFunctionTest.cpp | 44 +++++++++---------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/velox/functions/iceberg/BucketFunction.cpp b/velox/functions/iceberg/BucketFunction.cpp index d123d4c4f504..39f2f14b433d 100644 --- a/velox/functions/iceberg/BucketFunction.cpp +++ b/velox/functions/iceberg/BucketFunction.cpp @@ -24,7 +24,7 @@ namespace { FOLLY_ALWAYS_INLINE int32_t getBucketIndex(int32_t numBuckets, int32_t hashedValue) { - return (hashedValue & INT_MAX) % numBuckets; + return (hashedValue & std::numeric_limits::max()) % numBuckets; } // bucket(numBuckets, decimal) -> bucketIndex diff --git a/velox/functions/iceberg/tests/BucketFunctionTest.cpp b/velox/functions/iceberg/tests/BucketFunctionTest.cpp index 500b9bec5031..efa768f5324b 100644 --- a/velox/functions/iceberg/tests/BucketFunctionTest.cpp +++ b/velox/functions/iceberg/tests/BucketFunctionTest.cpp @@ -40,12 +40,13 @@ class BucketFunctionTest return evaluateOnce("bucket(c0, c1)", numBuckets, value); } - void assertBucket( - const VectorPtr& expected, - const std::vector& input) { - auto result = - evaluate>("bucket(c0, c1)", makeRowVector(input)); - facebook::velox::test::assertEqualVectors(expected, result); + template + std::optional bucket( + const TypePtr& type, + std::optional numBuckets, + std::optional value) { + return evaluateOnce( + "bucket(c0, c1)", {INTEGER(), type}, numBuckets, value); } }; @@ -88,10 +89,8 @@ TEST_F(BucketFunctionTest, string) { } TEST_F(BucketFunctionTest, binary) { - assertBucket( - makeFlatVector({122, 4}), - {makeFlatVector({128, 5}), - makeFlatVector({"abc", "abcdefg"}, VARBINARY())}); + EXPECT_EQ(bucket(VARBINARY(), 128, "abc"), 122); + EXPECT_EQ(bucket(VARBINARY(), 5, "abcdefg"), 4); } TEST_F(BucketFunctionTest, timestamp) { @@ -107,19 +106,20 @@ TEST_F(BucketFunctionTest, timestamp) { } TEST_F(BucketFunctionTest, date) { - assertBucket( - makeFlatVector({3, 6}), - {makeConstant(10, 2), makeFlatVector({8, 42}, DATE())}); + EXPECT_EQ(bucket(DATE(), 10, 8), 3); + EXPECT_EQ(bucket(DATE(), 10, 42), 6); } TEST_F(BucketFunctionTest, decimal) { - assertBucket( - makeFlatVector({56, 13, 2, 85, 3}), - {makeFlatVector({64, 18, 16, 128, 18}), - makeFlatVector({1234, 1230, 12999, 5, 5, 5}, DECIMAL(9, 2))}); - assertBucket( - makeFlatVector({7, 7, 6, 4}), - {makeConstant(10, 4), - makeFlatVector( - {12, 0, 234, DecimalUtil::kLongDecimalMax}, DECIMAL(38, 2))}); + EXPECT_EQ(bucket(DECIMAL(9, 2), 64, 1234), 56); + EXPECT_EQ(bucket(DECIMAL(9, 2), 18, 1230), 13); + EXPECT_EQ(bucket(DECIMAL(9, 2), 16, 12999), 2); + EXPECT_EQ(bucket(DECIMAL(9, 2), 128, 5), 85); + EXPECT_EQ(bucket(DECIMAL(9, 2), 18, 5), 3); + + EXPECT_EQ(bucket(DECIMAL(38, 2), 10, 12), 7); + EXPECT_EQ(bucket(DECIMAL(38, 2), 10, 0), 7); + EXPECT_EQ(bucket(DECIMAL(38, 2), 10, 234), 6); + EXPECT_EQ( + bucket(DECIMAL(38, 2), 10, DecimalUtil::kLongDecimalMax), 4); } From dfcd4a52edf6bded366e801270b2f83709e27457 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Wed, 27 Aug 2025 18:04:16 +0100 Subject: [PATCH 07/11] Remove duplicate library --- velox/functions/iceberg/tests/CMakeLists.txt | 4 ---- 1 file changed, 4 deletions(-) diff --git a/velox/functions/iceberg/tests/CMakeLists.txt b/velox/functions/iceberg/tests/CMakeLists.txt index 93621ab57ffa..360de666f8d6 100644 --- a/velox/functions/iceberg/tests/CMakeLists.txt +++ b/velox/functions/iceberg/tests/CMakeLists.txt @@ -24,12 +24,8 @@ target_link_libraries( velox_functions_iceberg_test velox_functions_iceberg velox_functions_test_lib - velox_is_null_functions - velox_exec_test_lib velox_expression velox_memory - velox_dwio_common_test_utils GTest::gtest GTest::gtest_main - GTest::gmock ) From d6555493c237d0fb0450303eec6ae443472d800a Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Fri, 29 Aug 2025 10:10:28 +0100 Subject: [PATCH 08/11] fix code style --- velox/functions/iceberg/CMakeLists.txt | 11 +++++++---- velox/functions/iceberg/tests/CMakeLists.txt | 3 +-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/velox/functions/iceberg/CMakeLists.txt b/velox/functions/iceberg/CMakeLists.txt index a0390c877cb8..6dac7e5bdab2 100644 --- a/velox/functions/iceberg/CMakeLists.txt +++ b/velox/functions/iceberg/CMakeLists.txt @@ -18,12 +18,15 @@ velox_link_libraries(velox_functions_iceberg_hash velox_common_base velox_functi velox_add_library(velox_functions_iceberg BucketFunction.cpp Register.cpp) -velox_link_libraries(velox_functions_iceberg velox_functions_iceberg_hash - velox_functions_lib Folly::folly) +velox_link_libraries( + velox_functions_iceberg + velox_functions_iceberg_hash + velox_functions_lib + Folly::folly +) if(NOT VELOX_MONO_LIBRARY) - set_property(TARGET velox_functions_iceberg PROPERTY JOB_POOL_COMPILE - high_memory_pool) + set_property(TARGET velox_functions_iceberg PROPERTY JOB_POOL_COMPILE high_memory_pool) endif() if(${VELOX_BUILD_TESTING}) diff --git a/velox/functions/iceberg/tests/CMakeLists.txt b/velox/functions/iceberg/tests/CMakeLists.txt index 360de666f8d6..133f73984009 100644 --- a/velox/functions/iceberg/tests/CMakeLists.txt +++ b/velox/functions/iceberg/tests/CMakeLists.txt @@ -11,8 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -add_executable(velox_functions_iceberg_test BucketFunctionTest.cpp - Murmur3Hash32Test.cpp) +add_executable(velox_functions_iceberg_test BucketFunctionTest.cpp Murmur3Hash32Test.cpp) add_test( NAME velox_functions_iceberg_test From 257c8e4e8e9e2cdf8eb04390ac477308025bd57e Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Fri, 29 Aug 2025 13:12:31 +0100 Subject: [PATCH 09/11] address comments --- velox/functions/iceberg/tests/BucketFunctionTest.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/velox/functions/iceberg/tests/BucketFunctionTest.cpp b/velox/functions/iceberg/tests/BucketFunctionTest.cpp index efa768f5324b..767e548ef237 100644 --- a/velox/functions/iceberg/tests/BucketFunctionTest.cpp +++ b/velox/functions/iceberg/tests/BucketFunctionTest.cpp @@ -50,8 +50,6 @@ class BucketFunctionTest } }; -} // namespace - TEST_F(BucketFunctionTest, integerTypes) { EXPECT_EQ(bucket(10, 8), 3); EXPECT_EQ(bucket(10, 42), 6); @@ -123,3 +121,5 @@ TEST_F(BucketFunctionTest, decimal) { EXPECT_EQ( bucket(DECIMAL(38, 2), 10, DecimalUtil::kLongDecimalMax), 4); } + +} // namespace From f1ce22fbd7b96658857dc472dbfef7b5f5c1d137 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 1 Sep 2025 10:08:17 +0100 Subject: [PATCH 10/11] address comments --- velox/functions/iceberg/tests/BucketFunctionTest.cpp | 2 ++ velox/functions/iceberg/tests/IcebergFunctionBaseTest.h | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/velox/functions/iceberg/tests/BucketFunctionTest.cpp b/velox/functions/iceberg/tests/BucketFunctionTest.cpp index 767e548ef237..768984e9693c 100644 --- a/velox/functions/iceberg/tests/BucketFunctionTest.cpp +++ b/velox/functions/iceberg/tests/BucketFunctionTest.cpp @@ -23,6 +23,7 @@ using namespace facebook::velox; using namespace facebook::velox::exec; using namespace facebook::velox::functions::iceberg; +namespace facebook::velox::functions::iceberg { namespace { class BucketFunctionTest @@ -123,3 +124,4 @@ TEST_F(BucketFunctionTest, decimal) { } } // namespace +} // namespace facebook::velox::functions::iceberg diff --git a/velox/functions/iceberg/tests/IcebergFunctionBaseTest.h b/velox/functions/iceberg/tests/IcebergFunctionBaseTest.h index f38501a8172f..1a152b153599 100644 --- a/velox/functions/iceberg/tests/IcebergFunctionBaseTest.h +++ b/velox/functions/iceberg/tests/IcebergFunctionBaseTest.h @@ -25,9 +25,8 @@ class IcebergFunctionBaseTest : public facebook::velox::functions::test::FunctionBaseTest { protected: static void SetUpTestCase() { - parse::registerTypeResolver(); + FunctionBaseTest::SetUpTestCase(); registerFunctions(""); - memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{}); } }; From 8d59d01b4352b4060bc961ed9ad30dd42eff9172 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 1 Sep 2025 10:12:00 +0100 Subject: [PATCH 11/11] remove unused using namespace --- velox/functions/iceberg/tests/BucketFunctionTest.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/velox/functions/iceberg/tests/BucketFunctionTest.cpp b/velox/functions/iceberg/tests/BucketFunctionTest.cpp index 768984e9693c..3de2f66be341 100644 --- a/velox/functions/iceberg/tests/BucketFunctionTest.cpp +++ b/velox/functions/iceberg/tests/BucketFunctionTest.cpp @@ -21,7 +21,6 @@ using namespace facebook::velox; using namespace facebook::velox::exec; -using namespace facebook::velox::functions::iceberg; namespace facebook::velox::functions::iceberg { namespace {