diff --git a/presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt b/presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt index 6f558d96bae20..0e04e97b34e2e 100644 --- a/presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt @@ -11,6 +11,7 @@ # limitations under the License. add_library(presto_connectors IcebergPrestoToVeloxConnector.cpp + PrestoToVeloxConnectorUtils.cpp Registration.cpp PrestoToVeloxConnector.cpp SystemConnector.cpp) diff --git a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp index 923801aae82f3..9a787838037ef 100644 --- a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp @@ -13,6 +13,7 @@ */ #include "presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h" +#include "presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h" #include "presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h" #include "velox/connectors/hive/iceberg/IcebergSplit.h" @@ -42,6 +43,76 @@ velox::dwio::common::FileFormat toVeloxFileFormat( VELOX_UNSUPPORTED("Unsupported file format: {}", fmt::underlying(format)); } +std::unique_ptr toIcebergTableHandle( + const protocol::TupleDomain& domainPredicate, + const std::shared_ptr& remainingPredicate, + bool isPushdownFilterEnabled, + const std::string& tableName, + const protocol::List& dataColumns, + const protocol::TableHandle& tableHandle, + const protocol::Map& tableParameters, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser) { + velox::common::SubfieldFilters subfieldFilters; + auto domains = domainPredicate.domains; + for (const auto& domain : *domains) { + auto filter = domain.second; + subfieldFilters[velox::common::Subfield(domain.first)] = + toFilter(domain.second, exprConverter, typeParser); + } + + auto remainingFilter = exprConverter.toVeloxExpr(remainingPredicate); + if (auto constant = + std::dynamic_pointer_cast( + remainingFilter)) { + bool value = constant->value().value(); + VELOX_CHECK(value, "Unexpected always-false remaining predicate"); + + // Use null for always-true filter. + remainingFilter = nullptr; + } + + velox::RowTypePtr finalDataColumns; + if (!dataColumns.empty()) { + std::vector names; + std::vector types; + velox::type::fbhive::HiveTypeParser hiveTypeParser; + names.reserve(dataColumns.size()); + types.reserve(dataColumns.size()); + for (auto& column : dataColumns) { + // For iceberg, the column name should be consistent with + // names in iceberg manifest file. The names in iceberg + // manifest file are consistent with the field names in + // parquet data file. + names.emplace_back(column.name); + auto parsedType = hiveTypeParser.parse(column.type); + // The type from the metastore may have upper case letters + // in field names, convert them all to lower case to be + // compatible with Presto. + types.push_back(VELOX_DYNAMIC_TYPE_DISPATCH( + fieldNamesToLowerCase, parsedType->kind(), parsedType)); + } + finalDataColumns = ROW(std::move(names), std::move(types)); + } + + std::unordered_map finalTableParameters = {}; + if (!tableParameters.empty()) { + finalTableParameters.reserve(tableParameters.size()); + for (const auto& [key, value] : tableParameters) { + finalTableParameters[key] = value; + } + } + + return std::make_unique( + tableHandle.connectorId, + tableName, + isPushdownFilterEnabled, + std::move(subfieldFilters), + remainingFilter, + finalDataColumns, + finalTableParameters); +} + } // namespace std::unique_ptr @@ -186,7 +257,7 @@ IcebergPrestoToVeloxConnector::toVeloxTableHandle( icebergTableHandle->schemaName, icebergTableHandle->icebergTableName.tableName); - return toHiveTableHandle( + return toIcebergTableHandle( icebergLayout->domainPredicate, icebergLayout->remainingPredicate, icebergLayout->pushdownFilterEnabled, diff --git a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.cpp index b14ba77cc251f..55540a703533b 100644 --- a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.cpp @@ -13,6 +13,7 @@ */ #include "presto_cpp/main/connectors/PrestoToVeloxConnector.h" +#include "presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h" #include "presto_cpp/main/types/PrestoToVeloxExpr.h" #include "presto_cpp/main/types/TypeParser.h" #include "presto_cpp/presto_protocol/connector/hive/HiveConnectorProtocol.h" @@ -104,658 +105,6 @@ std::string toJsonString(const T& value) { return ((json)value).dump(); } -template -TypePtr fieldNamesToLowerCase(const TypePtr& type) { - return type; -} - -template <> -TypePtr fieldNamesToLowerCase(const TypePtr& type); - -template <> -TypePtr fieldNamesToLowerCase(const TypePtr& type); - -template <> -TypePtr fieldNamesToLowerCase(const TypePtr& type); - -template <> -TypePtr fieldNamesToLowerCase(const TypePtr& type) { - auto& elementType = type->childAt(0); - return std::make_shared(VELOX_DYNAMIC_TYPE_DISPATCH( - fieldNamesToLowerCase, elementType->kind(), elementType)); -} - -template <> -TypePtr fieldNamesToLowerCase(const TypePtr& type) { - auto& keyType = type->childAt(0); - auto& valueType = type->childAt(1); - return std::make_shared( - VELOX_DYNAMIC_TYPE_DISPATCH( - fieldNamesToLowerCase, keyType->kind(), keyType), - VELOX_DYNAMIC_TYPE_DISPATCH( - fieldNamesToLowerCase, valueType->kind(), valueType)); -} - -template <> -TypePtr fieldNamesToLowerCase(const TypePtr& type) { - auto& rowType = type->asRow(); - std::vector names; - std::vector types; - names.reserve(type->size()); - types.reserve(type->size()); - for (int i = 0; i < rowType.size(); i++) { - std::string name = rowType.nameOf(i); - folly::toLowerAscii(name); - names.push_back(std::move(name)); - auto& childType = rowType.childAt(i); - types.push_back(VELOX_DYNAMIC_TYPE_DISPATCH( - fieldNamesToLowerCase, childType->kind(), childType)); - } - return std::make_shared(std::move(names), std::move(types)); -} - -int64_t toInt64( - const std::shared_ptr& block, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - auto value = exprConverter.getConstantValue(type, *block); - return VariantConverter::convert(value) - .value(); -} - -int128_t toInt128( - const std::shared_ptr& block, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - auto value = exprConverter.getConstantValue(type, *block); - return value.value(); -} - -Timestamp toTimestamp( - const std::shared_ptr& block, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - const auto value = exprConverter.getConstantValue(type, *block); - return value.value(); -} - -int64_t dateToInt64( - const std::shared_ptr& block, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - auto value = exprConverter.getConstantValue(type, *block); - return value.value(); -} - -template -T toFloatingPoint( - const std::shared_ptr& block, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - auto variant = exprConverter.getConstantValue(type, *block); - return variant.value(); -} - -std::string toString( - const std::shared_ptr& block, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - auto value = exprConverter.getConstantValue(type, *block); - if (type->isVarbinary()) { - return value.value(); - } - return value.value(); -} - -bool toBoolean( - const std::shared_ptr& block, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - auto variant = exprConverter.getConstantValue(type, *block); - return variant.value(); -} - -std::unique_ptr bigintRangeToFilter( - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - bool lowUnbounded = range.low.valueBlock == nullptr; - auto low = lowUnbounded ? std::numeric_limits::min() - : toInt64(range.low.valueBlock, exprConverter, type); - if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { - low++; - } - - bool highUnbounded = range.high.valueBlock == nullptr; - auto high = highUnbounded - ? std::numeric_limits::max() - : toInt64(range.high.valueBlock, exprConverter, type); - if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { - high--; - } - return std::make_unique(low, high, nullAllowed); -} - -std::unique_ptr hugeintRangeToFilter( - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - bool lowUnbounded = range.low.valueBlock == nullptr; - auto low = lowUnbounded ? std::numeric_limits::min() - : toInt128(range.low.valueBlock, exprConverter, type); - if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { - low++; - } - - bool highUnbounded = range.high.valueBlock == nullptr; - auto high = highUnbounded - ? std::numeric_limits::max() - : toInt128(range.high.valueBlock, exprConverter, type); - if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { - high--; - } - return std::make_unique(low, high, nullAllowed); -} - -std::unique_ptr timestampRangeToFilter( - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - const bool lowUnbounded = range.low.valueBlock == nullptr; - auto low = lowUnbounded - ? std::numeric_limits::min() - : toTimestamp(range.low.valueBlock, exprConverter, type); - if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { - ++low; - } - - const bool highUnbounded = range.high.valueBlock == nullptr; - auto high = highUnbounded - ? std::numeric_limits::max() - : toTimestamp(range.high.valueBlock, exprConverter, type); - if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { - --high; - } - return std::make_unique(low, high, nullAllowed); -} - -std::unique_ptr boolRangeToFilter( - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; - bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; - bool highExclusive = range.high.bound == protocol::Bound::BELOW; - bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; - - if (!lowUnbounded && !highUnbounded) { - bool lowValue = toBoolean(range.low.valueBlock, exprConverter, type); - bool highValue = toBoolean(range.high.valueBlock, exprConverter, type); - VELOX_CHECK_EQ( - lowValue, - highValue, - "Boolean range should not be [FALSE, TRUE] after coordinator " - "optimization."); - return std::make_unique(lowValue, nullAllowed); - } - // Presto coordinator has made optimizations to the bool range already. For - // example, [FALSE, TRUE) will be optimized and shown here as (-infinity, - // TRUE). Plus (-infinity, +infinity) case has been guarded in toFilter() - // method, here it can only be one side bounded scenarios. - VELOX_CHECK_NE( - lowUnbounded, - highUnbounded, - "Passed in boolean range can only have one side bounded range scenario"); - if (!lowUnbounded) { - VELOX_CHECK( - highUnbounded, - "Boolean range should not be double side bounded after coordinator " - "optimization."); - bool lowValue = toBoolean(range.low.valueBlock, exprConverter, type); - - // (TRUE, +infinity) case, should resolve to filter all - if (lowExclusive && lowValue) { - if (nullAllowed) { - return std::make_unique(); - } - return std::make_unique(); - } - - // Both cases (FALSE, +infinity) or [TRUE, +infinity) should evaluate to - // true. Case [FALSE, +infinity) should not be expected - VELOX_CHECK( - !(!lowExclusive && !lowValue), - "Case [FALSE, +infinity) should " - "not be expected"); - return std::make_unique(true, nullAllowed); - } - if (!highUnbounded) { - VELOX_CHECK( - lowUnbounded, - "Boolean range should not be double side bounded after coordinator " - "optimization."); - bool highValue = toBoolean(range.high.valueBlock, exprConverter, type); - - // (-infinity, FALSE) case, should resolve to filter all - if (highExclusive && !highValue) { - if (nullAllowed) { - return std::make_unique(); - } - return std::make_unique(); - } - - // Both cases (-infinity, TRUE) or (-infinity, FALSE] should evaluate to - // false. Case (-infinity, TRUE] should not be expected - VELOX_CHECK( - !(!highExclusive && highValue), - "Case (-infinity, TRUE] should " - "not be expected"); - return std::make_unique(false, nullAllowed); - } - VELOX_UNREACHABLE(); -} - -template -std::unique_ptr floatingPointRangeToFilter( - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; - bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; - auto low = lowUnbounded - ? (-1.0 * std::numeric_limits::infinity()) - : toFloatingPoint(range.low.valueBlock, exprConverter, type); - - bool highExclusive = range.high.bound == protocol::Bound::BELOW; - bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; - auto high = highUnbounded - ? std::numeric_limits::infinity() - : toFloatingPoint(range.high.valueBlock, exprConverter, type); - - // Handle NaN cases as NaN is not supported as a limit in Velox Filters - if (!lowUnbounded && std::isnan(low)) { - if (lowExclusive) { - // x > NaN is always false as NaN is considered the largest value. - return std::make_unique(); - } - // Equivalent to x > infinity as only NaN is greater than infinity - // Presto currently converts x >= NaN into the filter with domain - // [NaN, max), so ignoring the high value is fine. - low = std::numeric_limits::infinity(); - lowExclusive = true; - high = std::numeric_limits::infinity(); - highUnbounded = true; - highExclusive = false; - } else if (!highUnbounded && std::isnan(high)) { - high = std::numeric_limits::infinity(); - if (highExclusive) { - // equivalent to x in [low , infinity] or (low , infinity] - highExclusive = false; - } else { - if (lowUnbounded) { - // Anything <= NaN is true as NaN is the largest possible value. - return std::make_unique(); - } - // Equivalent to x > low or x >=low - highUnbounded = true; - } - } - - return std::make_unique>( - low, - lowUnbounded, - lowExclusive, - high, - highUnbounded, - highExclusive, - nullAllowed); -} - -std::unique_ptr varcharRangeToFilter( - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; - bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; - auto low = - lowUnbounded ? "" : toString(range.low.valueBlock, exprConverter, type); - - bool highExclusive = range.high.bound == protocol::Bound::BELOW; - bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; - auto high = - highUnbounded ? "" : toString(range.high.valueBlock, exprConverter, type); - return std::make_unique( - low, - lowUnbounded, - lowExclusive, - high, - highUnbounded, - highExclusive, - nullAllowed); -} - -std::unique_ptr dateRangeToFilter( - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - bool lowUnbounded = range.low.valueBlock == nullptr; - auto low = lowUnbounded - ? std::numeric_limits::min() - : dateToInt64(range.low.valueBlock, exprConverter, type); - if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { - low++; - } - - bool highUnbounded = range.high.valueBlock == nullptr; - auto high = highUnbounded - ? std::numeric_limits::max() - : dateToInt64(range.high.valueBlock, exprConverter, type); - if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { - high--; - } - - return std::make_unique(low, high, nullAllowed); -} - -std::unique_ptr combineIntegerRanges( - std::vector>& bigintFilters, - bool nullAllowed) { - bool allSingleValue = std::all_of( - bigintFilters.begin(), bigintFilters.end(), [](const auto& range) { - return range->isSingleValue(); - }); - - if (allSingleValue) { - std::vector values; - values.reserve(bigintFilters.size()); - for (const auto& filter : bigintFilters) { - values.emplace_back(filter->lower()); - } - return common::createBigintValues(values, nullAllowed); - } - - if (bigintFilters.size() == 2 && - bigintFilters[0]->lower() == std::numeric_limits::min() && - bigintFilters[1]->upper() == std::numeric_limits::max()) { - assert(bigintFilters[0]->upper() + 1 <= bigintFilters[1]->lower() - 1); - return std::make_unique( - bigintFilters[0]->upper() + 1, - bigintFilters[1]->lower() - 1, - nullAllowed); - } - - bool allNegatedValues = true; - bool foundMaximum = false; - assert(bigintFilters.size() > 1); // true by size checks on ranges - std::vector rejectedValues; - - // check if int64 min is a rejected value - if (bigintFilters[0]->lower() == std::numeric_limits::min() + 1) { - rejectedValues.emplace_back(std::numeric_limits::min()); - } - if (bigintFilters[0]->lower() > std::numeric_limits::min() + 1) { - // too many value at the lower end, bail out - return std::make_unique( - std::move(bigintFilters), nullAllowed); - } - rejectedValues.push_back(bigintFilters[0]->upper() + 1); - for (int i = 1; i < bigintFilters.size(); ++i) { - if (bigintFilters[i]->lower() != bigintFilters[i - 1]->upper() + 2) { - allNegatedValues = false; - break; - } - if (bigintFilters[i]->upper() == std::numeric_limits::max()) { - foundMaximum = true; - break; - } - rejectedValues.push_back(bigintFilters[i]->upper() + 1); - // make sure there is another range possible above this one - if (bigintFilters[i]->upper() == std::numeric_limits::max() - 1) { - foundMaximum = true; - break; - } - } - - if (allNegatedValues && foundMaximum) { - return common::createNegatedBigintValues(rejectedValues, nullAllowed); - } - - return std::make_unique( - std::move(bigintFilters), nullAllowed); -} - -std::unique_ptr combineBytesRanges( - std::vector>& bytesFilters, - bool nullAllowed) { - bool allSingleValue = std::all_of( - bytesFilters.begin(), bytesFilters.end(), [](const auto& range) { - return range->isSingleValue(); - }); - - if (allSingleValue) { - std::vector values; - values.reserve(bytesFilters.size()); - for (const auto& filter : bytesFilters) { - values.emplace_back(filter->lower()); - } - return std::make_unique(values, nullAllowed); - } - - int lowerUnbounded = 0, upperUnbounded = 0; - bool allExclusive = std::all_of( - bytesFilters.begin(), bytesFilters.end(), [](const auto& range) { - return range->lowerExclusive() && range->upperExclusive(); - }); - if (allExclusive) { - folly::F14FastSet unmatched; - std::vector rejectedValues; - rejectedValues.reserve(bytesFilters.size()); - for (int i = 0; i < bytesFilters.size(); ++i) { - if (bytesFilters[i]->isLowerUnbounded()) { - ++lowerUnbounded; - } else { - if (unmatched.contains(bytesFilters[i]->lower())) { - unmatched.erase(bytesFilters[i]->lower()); - rejectedValues.emplace_back(bytesFilters[i]->lower()); - } else { - unmatched.insert(bytesFilters[i]->lower()); - } - } - if (bytesFilters[i]->isUpperUnbounded()) { - ++upperUnbounded; - } else { - if (unmatched.contains(bytesFilters[i]->upper())) { - unmatched.erase(bytesFilters[i]->upper()); - rejectedValues.emplace_back(bytesFilters[i]->upper()); - } else { - unmatched.insert(bytesFilters[i]->upper()); - } - } - } - - if (lowerUnbounded == 1 && upperUnbounded == 1 && unmatched.size() == 0) { - return std::make_unique( - rejectedValues, nullAllowed); - } - } - - if (bytesFilters.size() == 2 && bytesFilters[0]->isLowerUnbounded() && - bytesFilters[1]->isUpperUnbounded()) { - // create a negated bytes range instead - return std::make_unique( - bytesFilters[0]->upper(), - false, - !bytesFilters[0]->upperExclusive(), - bytesFilters[1]->lower(), - false, - !bytesFilters[1]->lowerExclusive(), - nullAllowed); - } - - std::vector> bytesGeneric; - for (int i = 0; i < bytesFilters.size(); ++i) { - bytesGeneric.emplace_back(std::unique_ptr( - dynamic_cast(bytesFilters[i].release()))); - } - - return std::make_unique( - std::move(bytesGeneric), nullAllowed, false); -} - -std::unique_ptr toFilter( - const TypePtr& type, - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter) { - if (type->isDate()) { - return dateRangeToFilter(range, nullAllowed, exprConverter, type); - } - switch (type->kind()) { - case TypeKind::TINYINT: - case TypeKind::SMALLINT: - case TypeKind::INTEGER: - case TypeKind::BIGINT: - return bigintRangeToFilter(range, nullAllowed, exprConverter, type); - case TypeKind::HUGEINT: - return hugeintRangeToFilter(range, nullAllowed, exprConverter, type); - case TypeKind::DOUBLE: - return floatingPointRangeToFilter( - range, nullAllowed, exprConverter, type); - case TypeKind::VARCHAR: - case TypeKind::VARBINARY: - return varcharRangeToFilter(range, nullAllowed, exprConverter, type); - case TypeKind::BOOLEAN: - return boolRangeToFilter(range, nullAllowed, exprConverter, type); - case TypeKind::REAL: - return floatingPointRangeToFilter( - range, nullAllowed, exprConverter, type); - case TypeKind::TIMESTAMP: - return timestampRangeToFilter(range, nullAllowed, exprConverter, type); - default: - VELOX_UNSUPPORTED("Unsupported range type: {}", type->toString()); - } -} - -std::unique_ptr toFilter( - const protocol::Domain& domain, - const VeloxExprConverter& exprConverter, - const TypeParser& typeParser) { - auto nullAllowed = domain.nullAllowed; - if (auto sortedRangeSet = - std::dynamic_pointer_cast(domain.values)) { - auto type = stringToType(sortedRangeSet->type, typeParser); - auto ranges = sortedRangeSet->ranges; - - if (ranges.empty()) { - VELOX_CHECK(nullAllowed, "Unexpected always-false filter"); - return std::make_unique(); - } - - if (ranges.size() == 1) { - // 'is not null' arrives as unbounded range with 'nulls not allowed'. - // We catch this case and create 'is not null' filter instead of the range - // filter. - const auto& range = ranges[0]; - bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; - bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; - bool highExclusive = range.high.bound == protocol::Bound::BELOW; - bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; - if (lowUnbounded && highUnbounded && !nullAllowed) { - return std::make_unique(); - } - - return toFilter(type, ranges[0], nullAllowed, exprConverter); - } - - if (type->isDate()) { - std::vector> dateFilters; - dateFilters.reserve(ranges.size()); - for (const auto& range : ranges) { - dateFilters.emplace_back( - dateRangeToFilter(range, nullAllowed, exprConverter, type)); - } - return std::make_unique( - std::move(dateFilters), nullAllowed); - } - - if (type->kind() == TypeKind::BIGINT || type->kind() == TypeKind::INTEGER || - type->kind() == TypeKind::SMALLINT || - type->kind() == TypeKind::TINYINT) { - std::vector> bigintFilters; - bigintFilters.reserve(ranges.size()); - for (const auto& range : ranges) { - bigintFilters.emplace_back( - bigintRangeToFilter(range, nullAllowed, exprConverter, type)); - } - return combineIntegerRanges(bigintFilters, nullAllowed); - } - - if (type->kind() == TypeKind::VARCHAR) { - std::vector> bytesFilters; - bytesFilters.reserve(ranges.size()); - for (const auto& range : ranges) { - bytesFilters.emplace_back( - varcharRangeToFilter(range, nullAllowed, exprConverter, type)); - } - return combineBytesRanges(bytesFilters, nullAllowed); - } - - if (type->kind() == TypeKind::BOOLEAN) { - VELOX_CHECK_EQ(ranges.size(), 2, "Multi bool ranges size can only be 2."); - std::unique_ptr boolFilter; - for (const auto& range : ranges) { - auto filter = - boolRangeToFilter(range, nullAllowed, exprConverter, type); - if (filter->kind() == common::FilterKind::kAlwaysFalse or - filter->kind() == common::FilterKind::kIsNull) { - continue; - } - VELOX_CHECK_NULL(boolFilter); - boolFilter = std::move(filter); - } - - VELOX_CHECK_NOT_NULL(boolFilter); - return boolFilter; - } - - std::vector> filters; - filters.reserve(ranges.size()); - for (const auto& range : ranges) { - filters.emplace_back(toFilter(type, range, nullAllowed, exprConverter)); - } - - return std::make_unique( - std::move(filters), nullAllowed, false); - } else if ( - auto equatableValueSet = - std::dynamic_pointer_cast( - domain.values)) { - if (equatableValueSet->entries.empty()) { - if (nullAllowed) { - return std::make_unique(); - } else { - return std::make_unique(); - } - } - VELOX_UNSUPPORTED( - "EquatableValueSet (with non-empty entries) to Velox filter conversion is not supported yet."); - } else if ( - auto allOrNoneValueSet = - std::dynamic_pointer_cast( - domain.values)) { - VELOX_UNSUPPORTED( - "AllOrNoneValueSet to Velox filter conversion is not supported yet."); - } - VELOX_UNSUPPORTED("Unsupported filter found."); -} - connector::hive::LocationHandle::TableType toTableType( protocol::hive::TableType tableType) { switch (tableType) { @@ -973,12 +322,6 @@ velox::connector::hive::HiveBucketConversion toVeloxBucketConversion( } // namespace -TypePtr stringToType( - const std::string& typeString, - const TypeParser& typeParser) { - return typeParser.parse(typeString); -} - std::vector toRequiredSubfields( const protocol::List& subfields) { std::vector result; diff --git a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h index 5462fdd53984f..0e86b87a241fe 100644 --- a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h +++ b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h @@ -35,10 +35,6 @@ void unregisterPrestoToVeloxConnector(const std::string& connectorName); const PrestoToVeloxConnector& getPrestoToVeloxConnector( const std::string& connectorName); -velox::TypePtr stringToType( - const std::string& typeString, - const TypeParser& typeParser); - std::vector toRequiredSubfields( const protocol::List& subfields); diff --git a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.cpp b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.cpp new file mode 100644 index 0000000000000..5dbf73b9cb869 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.cpp @@ -0,0 +1,698 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h" + +#include + +namespace facebook::presto { + +using namespace facebook::velox; + +namespace { + +int64_t toInt64( + const std::shared_ptr& block, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + auto value = exprConverter.getConstantValue(type, *block); + return VariantConverter::convert(value) + .value(); +} + +int128_t toInt128( + const std::shared_ptr& block, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + auto value = exprConverter.getConstantValue(type, *block); + return value.value(); +} + +Timestamp toTimestamp( + const std::shared_ptr& block, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + const auto value = exprConverter.getConstantValue(type, *block); + return value.value(); +} + +int64_t dateToInt64( + const std::shared_ptr& block, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + auto value = exprConverter.getConstantValue(type, *block); + return value.value(); +} + +template +T toFloatingPoint( + const std::shared_ptr& block, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + auto variant = exprConverter.getConstantValue(type, *block); + return variant.value(); +} + +std::string toString( + const std::shared_ptr& block, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + auto value = exprConverter.getConstantValue(type, *block); + if (type->isVarbinary()) { + return value.value(); + } + return value.value(); +} + +bool toBoolean( + const std::shared_ptr& block, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + auto variant = exprConverter.getConstantValue(type, *block); + return variant.value(); +} + +std::unique_ptr bigintRangeToFilter( + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + bool lowUnbounded = range.low.valueBlock == nullptr; + auto low = lowUnbounded ? std::numeric_limits::min() + : toInt64(range.low.valueBlock, exprConverter, type); + if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { + low++; + } + + bool highUnbounded = range.high.valueBlock == nullptr; + auto high = highUnbounded + ? std::numeric_limits::max() + : toInt64(range.high.valueBlock, exprConverter, type); + if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { + high--; + } + return std::make_unique(low, high, nullAllowed); +} + +std::unique_ptr hugeintRangeToFilter( + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + bool lowUnbounded = range.low.valueBlock == nullptr; + auto low = lowUnbounded ? std::numeric_limits::min() + : toInt128(range.low.valueBlock, exprConverter, type); + if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { + low++; + } + + bool highUnbounded = range.high.valueBlock == nullptr; + auto high = highUnbounded + ? std::numeric_limits::max() + : toInt128(range.high.valueBlock, exprConverter, type); + if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { + high--; + } + return std::make_unique(low, high, nullAllowed); +} + +std::unique_ptr timestampRangeToFilter( + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + const bool lowUnbounded = range.low.valueBlock == nullptr; + auto low = lowUnbounded + ? std::numeric_limits::min() + : toTimestamp(range.low.valueBlock, exprConverter, type); + if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { + ++low; + } + + const bool highUnbounded = range.high.valueBlock == nullptr; + auto high = highUnbounded + ? std::numeric_limits::max() + : toTimestamp(range.high.valueBlock, exprConverter, type); + if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { + --high; + } + return std::make_unique(low, high, nullAllowed); +} + +std::unique_ptr boolRangeToFilter( + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; + bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; + bool highExclusive = range.high.bound == protocol::Bound::BELOW; + bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; + + if (!lowUnbounded && !highUnbounded) { + bool lowValue = toBoolean(range.low.valueBlock, exprConverter, type); + bool highValue = toBoolean(range.high.valueBlock, exprConverter, type); + VELOX_CHECK_EQ( + lowValue, + highValue, + "Boolean range should not be [FALSE, TRUE] after coordinator " + "optimization."); + return std::make_unique(lowValue, nullAllowed); + } + // Presto coordinator has made optimizations to the bool range already. For + // example, [FALSE, TRUE) will be optimized and shown here as (-infinity, + // TRUE). Plus (-infinity, +infinity) case has been guarded in toFilter() + // method, here it can only be one side bounded scenarios. + VELOX_CHECK_NE( + lowUnbounded, + highUnbounded, + "Passed in boolean range can only have one side bounded range scenario"); + if (!lowUnbounded) { + VELOX_CHECK( + highUnbounded, + "Boolean range should not be double side bounded after coordinator " + "optimization."); + bool lowValue = toBoolean(range.low.valueBlock, exprConverter, type); + + // (TRUE, +infinity) case, should resolve to filter all + if (lowExclusive && lowValue) { + if (nullAllowed) { + return std::make_unique(); + } + return std::make_unique(); + } + + // Both cases (FALSE, +infinity) or [TRUE, +infinity) should evaluate to + // true. Case [FALSE, +infinity) should not be expected + VELOX_CHECK( + !(!lowExclusive && !lowValue), + "Case [FALSE, +infinity) should " + "not be expected"); + return std::make_unique(true, nullAllowed); + } + if (!highUnbounded) { + VELOX_CHECK( + lowUnbounded, + "Boolean range should not be double side bounded after coordinator " + "optimization."); + bool highValue = toBoolean(range.high.valueBlock, exprConverter, type); + + // (-infinity, FALSE) case, should resolve to filter all + if (highExclusive && !highValue) { + if (nullAllowed) { + return std::make_unique(); + } + return std::make_unique(); + } + + // Both cases (-infinity, TRUE) or (-infinity, FALSE] should evaluate to + // false. Case (-infinity, TRUE] should not be expected + VELOX_CHECK( + !(!highExclusive && highValue), + "Case (-infinity, TRUE] should " + "not be expected"); + return std::make_unique(false, nullAllowed); + } + VELOX_UNREACHABLE(); +} + +template +std::unique_ptr floatingPointRangeToFilter( + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; + bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; + auto low = lowUnbounded + ? (-1.0 * std::numeric_limits::infinity()) + : toFloatingPoint(range.low.valueBlock, exprConverter, type); + + bool highExclusive = range.high.bound == protocol::Bound::BELOW; + bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; + auto high = highUnbounded + ? std::numeric_limits::infinity() + : toFloatingPoint(range.high.valueBlock, exprConverter, type); + + // Handle NaN cases as NaN is not supported as a limit in Velox Filters + if (!lowUnbounded && std::isnan(low)) { + if (lowExclusive) { + // x > NaN is always false as NaN is considered the largest value. + return std::make_unique(); + } + // Equivalent to x > infinity as only NaN is greater than infinity + // Presto currently converts x >= NaN into the filter with domain + // [NaN, max), so ignoring the high value is fine. + low = std::numeric_limits::infinity(); + lowExclusive = true; + high = std::numeric_limits::infinity(); + highUnbounded = true; + highExclusive = false; + } else if (!highUnbounded && std::isnan(high)) { + high = std::numeric_limits::infinity(); + if (highExclusive) { + // equivalent to x in [low , infinity] or (low , infinity] + highExclusive = false; + } else { + if (lowUnbounded) { + // Anything <= NaN is true as NaN is the largest possible value. + return std::make_unique(); + } + // Equivalent to x > low or x >=low + highUnbounded = true; + } + } + + return std::make_unique>( + low, + lowUnbounded, + lowExclusive, + high, + highUnbounded, + highExclusive, + nullAllowed); +} + +std::unique_ptr varcharRangeToFilter( + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; + bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; + auto low = + lowUnbounded ? "" : toString(range.low.valueBlock, exprConverter, type); + + bool highExclusive = range.high.bound == protocol::Bound::BELOW; + bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; + auto high = + highUnbounded ? "" : toString(range.high.valueBlock, exprConverter, type); + return std::make_unique( + low, + lowUnbounded, + lowExclusive, + high, + highUnbounded, + highExclusive, + nullAllowed); +} + +std::unique_ptr dateRangeToFilter( + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + bool lowUnbounded = range.low.valueBlock == nullptr; + auto low = lowUnbounded + ? std::numeric_limits::min() + : dateToInt64(range.low.valueBlock, exprConverter, type); + if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { + low++; + } + + bool highUnbounded = range.high.valueBlock == nullptr; + auto high = highUnbounded + ? std::numeric_limits::max() + : dateToInt64(range.high.valueBlock, exprConverter, type); + if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { + high--; + } + + return std::make_unique(low, high, nullAllowed); +} + +std::unique_ptr toFilter( + const TypePtr& type, + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter) { + if (type->isDate()) { + return dateRangeToFilter(range, nullAllowed, exprConverter, type); + } + switch (type->kind()) { + case TypeKind::TINYINT: + case TypeKind::SMALLINT: + case TypeKind::INTEGER: + case TypeKind::BIGINT: + return bigintRangeToFilter(range, nullAllowed, exprConverter, type); + case TypeKind::HUGEINT: + return hugeintRangeToFilter(range, nullAllowed, exprConverter, type); + case TypeKind::DOUBLE: + return floatingPointRangeToFilter( + range, nullAllowed, exprConverter, type); + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: + return varcharRangeToFilter(range, nullAllowed, exprConverter, type); + case TypeKind::BOOLEAN: + return boolRangeToFilter(range, nullAllowed, exprConverter, type); + case TypeKind::REAL: + return floatingPointRangeToFilter( + range, nullAllowed, exprConverter, type); + case TypeKind::TIMESTAMP: + return timestampRangeToFilter(range, nullAllowed, exprConverter, type); + default: + VELOX_UNSUPPORTED("Unsupported range type: {}", type->toString()); + } +} + +std::unique_ptr combineIntegerRanges( + std::vector>& bigintFilters, + bool nullAllowed) { + bool allSingleValue = std::all_of( + bigintFilters.begin(), bigintFilters.end(), [](const auto& range) { + return range->isSingleValue(); + }); + + if (allSingleValue) { + std::vector values; + values.reserve(bigintFilters.size()); + for (const auto& filter : bigintFilters) { + values.emplace_back(filter->lower()); + } + return common::createBigintValues(values, nullAllowed); + } + + if (bigintFilters.size() == 2 && + bigintFilters[0]->lower() == std::numeric_limits::min() && + bigintFilters[1]->upper() == std::numeric_limits::max()) { + assert(bigintFilters[0]->upper() + 1 <= bigintFilters[1]->lower() - 1); + return std::make_unique( + bigintFilters[0]->upper() + 1, + bigintFilters[1]->lower() - 1, + nullAllowed); + } + + bool allNegatedValues = true; + bool foundMaximum = false; + assert(bigintFilters.size() > 1); // true by size checks on ranges + std::vector rejectedValues; + + // check if int64 min is a rejected value + if (bigintFilters[0]->lower() == std::numeric_limits::min() + 1) { + rejectedValues.emplace_back(std::numeric_limits::min()); + } + if (bigintFilters[0]->lower() > std::numeric_limits::min() + 1) { + // too many value at the lower end, bail out + return std::make_unique( + std::move(bigintFilters), nullAllowed); + } + rejectedValues.push_back(bigintFilters[0]->upper() + 1); + for (int i = 1; i < bigintFilters.size(); ++i) { + if (bigintFilters[i]->lower() != bigintFilters[i - 1]->upper() + 2) { + allNegatedValues = false; + break; + } + if (bigintFilters[i]->upper() == std::numeric_limits::max()) { + foundMaximum = true; + break; + } + rejectedValues.push_back(bigintFilters[i]->upper() + 1); + // make sure there is another range possible above this one + if (bigintFilters[i]->upper() == std::numeric_limits::max() - 1) { + foundMaximum = true; + break; + } + } + + if (allNegatedValues && foundMaximum) { + return common::createNegatedBigintValues(rejectedValues, nullAllowed); + } + + return std::make_unique( + std::move(bigintFilters), nullAllowed); +} + +std::unique_ptr combineBytesRanges( + std::vector>& bytesFilters, + bool nullAllowed) { + bool allSingleValue = std::all_of( + bytesFilters.begin(), bytesFilters.end(), [](const auto& range) { + return range->isSingleValue(); + }); + + if (allSingleValue) { + std::vector values; + values.reserve(bytesFilters.size()); + for (const auto& filter : bytesFilters) { + values.emplace_back(filter->lower()); + } + return std::make_unique(values, nullAllowed); + } + + int lowerUnbounded = 0, upperUnbounded = 0; + bool allExclusive = std::all_of( + bytesFilters.begin(), bytesFilters.end(), [](const auto& range) { + return range->lowerExclusive() && range->upperExclusive(); + }); + if (allExclusive) { + folly::F14FastSet unmatched; + std::vector rejectedValues; + rejectedValues.reserve(bytesFilters.size()); + for (int i = 0; i < bytesFilters.size(); ++i) { + if (bytesFilters[i]->isLowerUnbounded()) { + ++lowerUnbounded; + } else { + if (unmatched.contains(bytesFilters[i]->lower())) { + unmatched.erase(bytesFilters[i]->lower()); + rejectedValues.emplace_back(bytesFilters[i]->lower()); + } else { + unmatched.insert(bytesFilters[i]->lower()); + } + } + if (bytesFilters[i]->isUpperUnbounded()) { + ++upperUnbounded; + } else { + if (unmatched.contains(bytesFilters[i]->upper())) { + unmatched.erase(bytesFilters[i]->upper()); + rejectedValues.emplace_back(bytesFilters[i]->upper()); + } else { + unmatched.insert(bytesFilters[i]->upper()); + } + } + } + + if (lowerUnbounded == 1 && upperUnbounded == 1 && unmatched.size() == 0) { + return std::make_unique( + rejectedValues, nullAllowed); + } + } + + if (bytesFilters.size() == 2 && bytesFilters[0]->isLowerUnbounded() && + bytesFilters[1]->isUpperUnbounded()) { + // create a negated bytes range instead + return std::make_unique( + bytesFilters[0]->upper(), + false, + !bytesFilters[0]->upperExclusive(), + bytesFilters[1]->lower(), + false, + !bytesFilters[1]->lowerExclusive(), + nullAllowed); + } + + std::vector> bytesGeneric; + for (int i = 0; i < bytesFilters.size(); ++i) { + bytesGeneric.emplace_back( + std::unique_ptr( + dynamic_cast(bytesFilters[i].release()))); + } + + return std::make_unique( + std::move(bytesGeneric), nullAllowed, false); +} + +} // namespace + +TypePtr stringToType( + const std::string& typeString, + const TypeParser& typeParser) { + return typeParser.parse(typeString); +} + +template <> +TypePtr fieldNamesToLowerCase(const TypePtr& type); + +template <> +TypePtr fieldNamesToLowerCase(const TypePtr& type); + +template <> +TypePtr fieldNamesToLowerCase(const TypePtr& type); + +template +TypePtr fieldNamesToLowerCase(const TypePtr& type) { + return type; +} + +template <> +TypePtr fieldNamesToLowerCase(const TypePtr& type) { + auto& rowType = type->asRow(); + std::vector names; + std::vector types; + names.reserve(type->size()); + types.reserve(type->size()); + for (int i = 0; i < rowType.size(); i++) { + std::string name = rowType.nameOf(i); + folly::toLowerAscii(name); + names.push_back(std::move(name)); + auto& childType = rowType.childAt(i); + types.push_back(VELOX_DYNAMIC_TYPE_DISPATCH( + fieldNamesToLowerCase, childType->kind(), childType)); + } + return std::make_shared(std::move(names), std::move(types)); +} + +template <> +TypePtr fieldNamesToLowerCase(const TypePtr& type) { + auto& keyType = type->childAt(0); + auto& valueType = type->childAt(1); + return std::make_shared( + VELOX_DYNAMIC_TYPE_DISPATCH( + fieldNamesToLowerCase, keyType->kind(), keyType), + VELOX_DYNAMIC_TYPE_DISPATCH( + fieldNamesToLowerCase, valueType->kind(), valueType)); +} + +template <> +TypePtr fieldNamesToLowerCase(const TypePtr& type) { + auto& elementType = type->childAt(0); + return std::make_shared(VELOX_DYNAMIC_TYPE_DISPATCH( + fieldNamesToLowerCase, elementType->kind(), elementType)); +} + +template TypePtr fieldNamesToLowerCase(const TypePtr&); +template TypePtr fieldNamesToLowerCase(const TypePtr&); +template TypePtr fieldNamesToLowerCase(const TypePtr&); +template TypePtr fieldNamesToLowerCase(const TypePtr&); +template TypePtr fieldNamesToLowerCase(const TypePtr&); +template TypePtr fieldNamesToLowerCase(const TypePtr&); +template TypePtr fieldNamesToLowerCase(const TypePtr&); +template TypePtr fieldNamesToLowerCase(const TypePtr&); +template TypePtr fieldNamesToLowerCase(const TypePtr&); +template TypePtr fieldNamesToLowerCase(const TypePtr&); +template TypePtr fieldNamesToLowerCase(const TypePtr&); + +std::unique_ptr toFilter( + const protocol::Domain& domain, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser) { + auto nullAllowed = domain.nullAllowed; + if (auto sortedRangeSet = + std::dynamic_pointer_cast(domain.values)) { + auto type = stringToType(sortedRangeSet->type, typeParser); + auto ranges = sortedRangeSet->ranges; + + if (ranges.empty()) { + VELOX_CHECK(nullAllowed, "Unexpected always-false filter"); + return std::make_unique(); + } + + if (ranges.size() == 1) { + // 'is not null' arrives as unbounded range with 'nulls not allowed'. + // We catch this case and create 'is not null' filter instead of the range + // filter. + const auto& range = ranges[0]; + bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; + bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; + bool highExclusive = range.high.bound == protocol::Bound::BELOW; + bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; + if (lowUnbounded && highUnbounded && !nullAllowed) { + return std::make_unique(); + } + + return toFilter(type, ranges[0], nullAllowed, exprConverter); + } + + if (type->isDate()) { + std::vector> dateFilters; + dateFilters.reserve(ranges.size()); + for (const auto& range : ranges) { + dateFilters.emplace_back( + dateRangeToFilter(range, nullAllowed, exprConverter, type)); + } + return std::make_unique( + std::move(dateFilters), nullAllowed); + } + + if (type->kind() == TypeKind::BIGINT || type->kind() == TypeKind::INTEGER || + type->kind() == TypeKind::SMALLINT || + type->kind() == TypeKind::TINYINT) { + std::vector> bigintFilters; + bigintFilters.reserve(ranges.size()); + for (const auto& range : ranges) { + bigintFilters.emplace_back( + bigintRangeToFilter(range, nullAllowed, exprConverter, type)); + } + return combineIntegerRanges(bigintFilters, nullAllowed); + } + + if (type->kind() == TypeKind::VARCHAR) { + std::vector> bytesFilters; + bytesFilters.reserve(ranges.size()); + for (const auto& range : ranges) { + bytesFilters.emplace_back( + varcharRangeToFilter(range, nullAllowed, exprConverter, type)); + } + return combineBytesRanges(bytesFilters, nullAllowed); + } + + if (type->kind() == TypeKind::BOOLEAN) { + VELOX_CHECK_EQ(ranges.size(), 2, "Multi bool ranges size can only be 2."); + std::unique_ptr boolFilter; + for (const auto& range : ranges) { + auto filter = + boolRangeToFilter(range, nullAllowed, exprConverter, type); + if (filter->kind() == common::FilterKind::kAlwaysFalse or + filter->kind() == common::FilterKind::kIsNull) { + continue; + } + VELOX_CHECK_NULL(boolFilter); + boolFilter = std::move(filter); + } + + VELOX_CHECK_NOT_NULL(boolFilter); + return boolFilter; + } + + std::vector> filters; + filters.reserve(ranges.size()); + for (const auto& range : ranges) { + filters.emplace_back(toFilter(type, range, nullAllowed, exprConverter)); + } + + return std::make_unique( + std::move(filters), nullAllowed, false); + } else if ( + auto equatableValueSet = + std::dynamic_pointer_cast( + domain.values)) { + if (equatableValueSet->entries.empty()) { + if (nullAllowed) { + return std::make_unique(); + } else { + return std::make_unique(); + } + } + VELOX_UNSUPPORTED( + "EquatableValueSet (with non-empty entries) to Velox filter conversion is not supported yet."); + } else if ( + auto allOrNoneValueSet = + std::dynamic_pointer_cast( + domain.values)) { + VELOX_UNSUPPORTED( + "AllOrNoneValueSet to Velox filter conversion is not supported yet."); + } + VELOX_UNSUPPORTED("Unsupported filter found."); +} + +} // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h new file mode 100644 index 0000000000000..ef701f87423a2 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "presto_cpp/main/types/PrestoToVeloxExpr.h" +#include "presto_cpp/presto_protocol/core/presto_protocol_core.h" +#include "velox/type/Filter.h" +#include "velox/type/Type.h" + +namespace facebook::presto { + +velox::TypePtr stringToType( + const std::string& typeString, + const TypeParser& typeParser); + +template +velox::TypePtr fieldNamesToLowerCase(const velox::TypePtr& type); + +std::unique_ptr toFilter( + const protocol::Domain& domain, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser); + +} // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp index 6824b8bc95751..e649d2425ed84 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp @@ -34,6 +34,7 @@ #include "presto_cpp/main/operators/ShuffleRead.h" #include "presto_cpp/main/operators/ShuffleWrite.h" #include "presto_cpp/main/types/TypeParser.h" +#include "presto_cpp/main/connectors/PrestoToVeloxConnectorUtils.h" using namespace facebook::velox; using namespace facebook::velox::exec; diff --git a/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp b/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp index c4f2752fda05d..5c3364aa7cced 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp +++ b/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp @@ -11,15 +11,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #include "presto_cpp/main/connectors/PrestoToVeloxConnector.h" -#include "presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h" #include +#include "presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h" +#include "presto_cpp/main/types/PrestoToVeloxExpr.h" +#include "presto_cpp/presto_protocol/connector/hive/HiveConnectorProtocol.h" +#include "presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h" #include "velox/common/base/tests/GTestUtils.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/TableHandle.h" using namespace facebook::presto; using namespace facebook::velox; -class PrestoToVeloxConnectorTest : public ::testing::Test {}; +class PrestoToVeloxConnectorTest : public ::testing::Test { + protected: + void SetUp() override { + memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{}); + pool_ = memory::memoryManager()->addLeafPool(); + typeParser_ = std::make_unique(); + exprConverter_ = + std::make_unique(pool_.get(), typeParser_.get()); + } + + std::shared_ptr pool_; + std::unique_ptr typeParser_; + std::unique_ptr exprConverter_; +}; TEST_F(PrestoToVeloxConnectorTest, registerVariousConnectors) { std::vector>> @@ -53,3 +72,115 @@ TEST_F(PrestoToVeloxConnectorTest, addDuplicates) { std::make_unique(kConnectorName)), fmt::format("Connector {} is already registered", kConnectorName)); } + +namespace { + +constexpr auto kColumnName1 = "MixedCaseCol1"; +constexpr auto kColumnName2 = "UPPERCASECOL2"; + +protocol::List createTestDataColumns() { + protocol::List dataColumns; + protocol::Column col1; + col1.name = kColumnName1; + col1.type = "integer"; + dataColumns.push_back(col1); + + protocol::Column col2; + col2.name = kColumnName2; + col2.type = "varchar"; + dataColumns.push_back(col2); + + return dataColumns; +} + +std::shared_ptr createTrueConstant() { + auto trueConstant = std::make_shared(); + trueConstant->type = "boolean"; + // base64-encoded true value. + trueConstant->valueBlock.data = "CgAAAEJZVEVfQVJSQVkBAAAAAAE="; + return trueConstant; +} + +template +void setCommonLayoutProperties( + std::shared_ptr layout, + const protocol::List& dataColumns, + std::shared_ptr predicate) { + layout->domainPredicate.domains = + std::make_shared>(); + layout->remainingPredicate = predicate; + layout->pushdownFilterEnabled = false; + layout->dataColumns = dataColumns; + layout->partitionColumns = {}; + layout->predicateColumns = {}; +} + +} // namespace + +TEST_F(PrestoToVeloxConnectorTest, icebergPreservesColumnNameCase) { + auto dataColumns = createTestDataColumns(); + auto trueConstant = createTrueConstant(); + + auto layout = std::make_shared(); + setCommonLayoutProperties(layout, dataColumns, trueConstant); + + auto icebergHandle = + std::make_shared(); + icebergHandle->schemaName = "test_schema"; + icebergHandle->icebergTableName.tableName = "test_table"; + + protocol::TableHandle tableHandle; + tableHandle.connectorId = "iceberg"; + tableHandle.connectorHandle = icebergHandle; + tableHandle.connectorTableLayout = layout; + + IcebergPrestoToVeloxConnector icebergConnector("iceberg"); + connector::ColumnHandleMap assignments; + auto result = icebergConnector.toVeloxTableHandle( + tableHandle, *exprConverter_, *typeParser_, assignments); + + ASSERT_NE(result, nullptr); + auto* handle = dynamic_cast(result.get()); + ASSERT_NE(handle, nullptr); + + // Verify Iceberg preserves column name case. + auto dataColumnsType = handle->dataColumns(); + ASSERT_NE(dataColumnsType, nullptr); + EXPECT_EQ(dataColumnsType->size(), 2); + EXPECT_EQ(dataColumnsType->nameOf(0), kColumnName1); + EXPECT_EQ(dataColumnsType->nameOf(1), kColumnName2); +} + +TEST_F(PrestoToVeloxConnectorTest, hiveLowercasesColumnNames) { + auto dataColumns = createTestDataColumns(); + auto trueConstant = createTrueConstant(); + + auto layout = std::make_shared(); + setCommonLayoutProperties(layout, dataColumns, trueConstant); + layout->tableParameters = {}; + + auto hiveHandle = std::make_shared(); + hiveHandle->tableName = "test_table"; + hiveHandle->schemaName = "test_schema"; + + protocol::TableHandle tableHandle; + tableHandle.connectorId = "hive"; + tableHandle.connectorHandle = hiveHandle; + tableHandle.connectorTableLayout = layout; + + HivePrestoToVeloxConnector hiveConnector("hive"); + connector::ColumnHandleMap assignments; + auto result = hiveConnector.toVeloxTableHandle( + tableHandle, *exprConverter_, *typeParser_, assignments); + + ASSERT_NE(result, nullptr); + auto* handle = dynamic_cast(result.get()); + ASSERT_NE(handle, nullptr); + + // Verify Hive lowercases column names. + auto dataColumnsType = handle->dataColumns(); + ASSERT_NE(dataColumnsType, nullptr); + EXPECT_EQ(dataColumnsType->size(), 2); + EXPECT_EQ(dataColumnsType->nameOf(0), "mixedcasecol1"); + EXPECT_EQ(dataColumnsType->nameOf(1), "uppercasecol2"); +}