Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 0 additions & 78 deletions velox/connectors/hive/HivePartitionFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,46 +28,6 @@ int32_t hashInt64(int64_t value) {
return ((*reinterpret_cast<uint64_t*>(&value)) >> 32) ^ value;
}

template <typename T>
inline int32_t hashDecimal(T value, uint8_t scale) {
bool isNegative = value < 0;
uint64_t absValue =
isNegative ? -static_cast<uint64_t>(value) : static_cast<uint64_t>(value);

uint32_t high = absValue >> 32;
uint32_t low = absValue;

uint32_t hash = 31 * high + low;
if (isNegative) {
hash = -hash;
}

return 31 * hash + scale;
}

// Simulates Hive's hashing function from Hive v1.2.1
// org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#hashcode()
// Returns java BigDecimal#hashCode()
template <>
inline int32_t hashDecimal<int128_t>(int128_t value, uint8_t scale) {
uint32_t words[4];
bool isNegative = value < 0;
uint128_t absValue = isNegative ? -value : value;
words[0] = absValue >> 96;
words[1] = absValue >> 64;
words[2] = absValue >> 32;
words[3] = absValue;

uint32_t hash = 0;
for (auto i = 0; i < 4; i++) {
hash = 31 * hash + words[i];
}
if (isNegative) {
hash = -hash;
}
return hash * 31 + scale;
}

#if defined(__has_feature)
#if __has_feature(__address_sanitizer__)
__attribute__((no_sanitize("integer")))
Expand Down Expand Up @@ -155,34 +115,6 @@ void hashPrimitive(
const SelectivityVector& rows,
bool mix,
std::vector<uint32_t>& hashes) {
const auto& type = values.base()->type();
if constexpr (kind == TypeKind::BIGINT || kind == TypeKind::HUGEINT) {
if (type->isDecimal()) {
const auto scale = getDecimalPrecisionScale(*type).second;
if (rows.isAllSelected()) {
vector_size_t numRows = rows.size();
for (auto i = 0; i < numRows; ++i) {
const uint32_t hash = values.isNullAt(i)
? 0
: hashDecimal(
values.valueAt<typename TypeTraits<kind>::NativeType>(i),
scale);
mergeHash(mix, hash, hashes[i]);
}
} else {
rows.applyToSelected([&](auto row) INLINE_LAMBDA {
const uint32_t hash = values.isNullAt(row)
? 0
: hashDecimal(
values.valueAt<typename TypeTraits<kind>::NativeType>(row),
scale);
mergeHash(mix, hash, hashes[row]);
});
}
return;
}
}

if (rows.isAllSelected()) {
// The compiler seems to be a little fickle with optimizations.
// Although rows.applyToSelected should do roughly the same thing, doing
Expand Down Expand Up @@ -279,16 +211,6 @@ void HivePartitionFunction::hashTyped<TypeKind::BIGINT>(
hashPrimitive<TypeKind::BIGINT>(values, rows, mix, hashes);
}

template <>
void HivePartitionFunction::hashTyped<TypeKind::HUGEINT>(
const DecodedVector& values,
const SelectivityVector& rows,
bool mix,
std::vector<uint32_t>& hashes,
size_t /* poolIndex */) {
hashPrimitive<TypeKind::HUGEINT>(values, rows, mix, hashes);
}

template <>
void HivePartitionFunction::hashTyped<TypeKind::DOUBLE>(
const DecodedVector& values,
Expand Down
17 changes: 0 additions & 17 deletions velox/connectors/hive/HivePartitionUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ namespace facebook::velox::connector::hive {
case TypeKind::SMALLINT: \
case TypeKind::INTEGER: \
case TypeKind::BIGINT: \
case TypeKind::HUGEINT: \
case TypeKind::VARCHAR: \
case TypeKind::VARBINARY: \
case TypeKind::TIMESTAMP: \
Expand Down Expand Up @@ -90,22 +89,6 @@ std::pair<std::string, std::string> makePartitionKeyValueString(
DATE()->toString(
partitionVector->as<SimpleVector<int32_t>>()->valueAt(row)));
}
if constexpr (Kind == TypeKind::BIGINT || Kind == TypeKind::HUGEINT) {
if (partitionVector->type()->isDecimal()) {
auto [precision, scale] =
getDecimalPrecisionScale(*partitionVector->type());
const auto maxStringSize =
DecimalUtil::maxStringViewSize(precision, scale);
std::vector<char> maxString(maxStringSize);
const auto size = DecimalUtil::castToString(
partitionVector->as<SimpleVector<T>>()->valueAt(row),
scale,
maxStringSize,
maxString.data());
return std::make_pair(name, std::string(maxString.data(), size));
}
}

return std::make_pair(
name,
makePartitionValueString(
Expand Down
12 changes: 0 additions & 12 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,6 @@ VectorPtr newConstantFromStringImpl(
pool, 1, false, type, std::move(days));
}

if constexpr (std::is_same_v<T, int64_t> || std::is_same_v<T, int128_t>) {
if (type->isDecimal()) {
auto [precision, scale] = getDecimalPrecisionScale(*type);
T result;
const auto status = DecimalUtil::castFromString<T>(
StringView(value.value()), precision, scale, result);
VELOX_USER_CHECK(status.ok(), status.message());
return std::make_shared<ConstantVector<T>>(
pool, 1, false, type, std::move(result));
}
}

if constexpr (std::is_same_v<T, StringView>) {
return std::make_shared<ConstantVector<StringView>>(
pool, 1, false, type, StringView(value.value()));
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace facebook::velox::connector::hive {
/// converted to their appropriate types.
///
/// @param type The target Velox type for the constant vector. Supports all
/// scalar types including primitives, dates, timestamps, and decimals.
/// scalar types including primitives, dates, timestamps.
/// @param value The string representation of the value to convert, formatted
/// the same way as CAST(x as VARCHAR). Date values must be formatted using ISO
/// 8601 as YYYY-MM-DD. If nullopt, creates a null constant vector.
Expand Down
45 changes: 0 additions & 45 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -921,51 +921,6 @@ TEST_F(HiveIcebergTest, partitionColumnsFromHive) {
AssertQueryBuilder(plan).splits(icebergSplits).assertResults(expectedVectors);
}

TEST_F(HiveIcebergTest, mixedScenario) {
auto fileRowType = ROW({"c0", "c1"}, {BIGINT(), VARCHAR()});
auto tableRowType =
ROW({"c0", "c1", "c2", "region"},
{BIGINT(), VARCHAR(), INTEGER(), DECIMAL(38, 5)});

// Write data file with c0 and c1.
std::vector<RowVectorPtr> dataVectors;
dataVectors.push_back(makeRowVector({
makeFlatVector<int64_t>({100, 200}),
makeFlatVector<std::string>({"a", "b"}),
}));
auto dataFilePath = TempFilePath::create();
writeToFile(dataFilePath->getPath(), dataVectors);

std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
partitionKeys["region"] = "12345.67890";

auto icebergSplits =
makeIcebergSplits(dataFilePath->getPath(), {}, partitionKeys);
auto assignments = makeColumnHandles(tableRowType, {3});

// Expected result:
// - c0, c1: from file.
// - c2: NULL (schema evolution).
// - region: from partition keys (DECIMAL(38,5) = 12345.67890).
std::vector<RowVectorPtr> expectedVectors;
expectedVectors.push_back(makeRowVector(
tableRowType->names(),
{
dataVectors[0]->childAt(0),
dataVectors[0]->childAt(1),
makeNullConstant(TypeKind::INTEGER, 2),
makeFlatVector<int128_t>(
{1'234'567'890, 1'234'567'890}, DECIMAL(38, 5)),
}));

// Read with table schema: c0, c1 (from file), c2 (new column), region
// (partition).
auto plan = PlanBuilder()
.tableScan(tableRowType, {}, "", tableRowType, assignments)
.planNode();
AssertQueryBuilder(plan).splits(icebergSplits).assertResults(expectedVectors);
}

#ifdef VELOX_ENABLE_PARQUET
TEST_F(HiveIcebergTest, positionalDeleteFileWithRowGroupFilter) {
// This file contains three row groups, each with about 100 rows.
Expand Down
61 changes: 0 additions & 61 deletions velox/connectors/hive/tests/HiveDataSinkTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,67 +598,6 @@ TEST_F(HiveDataSinkTest, basicBucket) {
verifyWrittenData(outputDirectory->getPath(), numBuckets);
}

TEST_F(HiveDataSinkTest, decimalPartition) {
const auto outputDirectory = TempDirectoryPath::create();

connectorSessionProperties_->set(
HiveConfig::kSortWriterFinishTimeSliceLimitMsSession, "1");
const auto rowType =
ROW({"c0", "c1", "c2"}, {BIGINT(), DECIMAL(14, 3), DECIMAL(20, 4)});
auto dataSink = createDataSink(
rowType,
outputDirectory->getPath(),
dwio::common::FileFormat::DWRF,
{"c2"});
auto stats = dataSink->stats();
ASSERT_TRUE(stats.empty()) << stats.toString();

const auto vector = makeRowVector(
{makeNullableFlatVector<int64_t>({1, 2, std::nullopt, 345}),
makeNullableFlatVector<int64_t>(
{1, 2, std::nullopt, 345}, DECIMAL(14, 3)),
makeFlatVector<int128_t>({1, 340, 234567, -345}, DECIMAL(20, 4))});

dataSink->appendData(vector);
while (!dataSink->finish()) {
}
const auto partitions = dataSink->close();
stats = dataSink->stats();
ASSERT_FALSE(stats.empty());
ASSERT_EQ(partitions.size(), vector->size());

createDuckDbTable({vector});

const auto rootPath = outputDirectory->getPath();
std::vector<std::shared_ptr<ConnectorSplit>> splits;
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
auto partitionPath = [&](std::string value) {
partitionKeys["c2"] = value;
auto path = listFiles(rootPath + "/c2=" + value)[0];
splits.push_back(makeHiveConnectorSplits(
path, 1, dwio::common::FileFormat::DWRF, partitionKeys)
.back());
};
partitionPath("0.0001");
partitionPath("0.0340");
partitionPath("23.4567");
partitionPath("-0.0345");

ColumnHandleMap assignments = {
{"c0", regularColumn("c0", BIGINT())},
{"c1", regularColumn("c1", DECIMAL(14, 3))},
{"c2", partitionKey("c2", DECIMAL(20, 4))}};

auto op = PlanBuilder()
.startTableScan()
.outputType(rowType)
.assignments(assignments)
.endTableScan()
.planNode();

assertQuery(op, splits, fmt::format("SELECT * FROM tmp"));
}

TEST_F(HiveDataSinkTest, close) {
for (bool empty : {true, false}) {
SCOPED_TRACE(fmt::format("Data sink is empty: {}", empty));
Expand Down
52 changes: 0 additions & 52 deletions velox/connectors/hive/tests/HivePartitionFunctionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,58 +124,6 @@ TEST_F(HivePartitionFunctionTest, bigint) {
assertPartitionsWithConstChannel(values, 997);
}

TEST_F(HivePartitionFunctionTest, shortDecimal) {
auto values = makeNullableFlatVector<int64_t>(
{std::nullopt,
300'000'000'000,
123456789,
DecimalUtil::kShortDecimalMin / 100,
DecimalUtil::kShortDecimalMax / 100},
DECIMAL(18, 2));

assertPartitions(values, 1, {0, 0, 0, 0, 0});
assertPartitions(values, 2, {0, 1, 1, 1, 1});
assertPartitions(values, 500, {0, 471, 313, 115, 37});
assertPartitions(values, 997, {0, 681, 6, 982, 502});

assertPartitionsWithConstChannel(values, 1);
assertPartitionsWithConstChannel(values, 2);
assertPartitionsWithConstChannel(values, 500);
assertPartitionsWithConstChannel(values, 997);

values = makeFlatVector<int64_t>(
{123456789, DecimalUtil::kShortDecimalMin, DecimalUtil::kShortDecimalMax},
DECIMAL(18, 0));
assertPartitions(values, 500, {311, 236, 412});
}

TEST_F(HivePartitionFunctionTest, longDecimal) {
auto values = makeNullableFlatVector<int128_t>(
{std::nullopt,
300'000'000'000,
HugeInt::parse("12345678901234567891"),
DecimalUtil::kLongDecimalMin / 100,
DecimalUtil::kLongDecimalMax / 100},
DECIMAL(38, 2));

assertPartitions(values, 1, {0, 0, 0, 0, 0});
assertPartitions(values, 2, {0, 1, 1, 1, 1});
assertPartitions(values, 500, {0, 471, 99, 49, 103});
assertPartitions(values, 997, {0, 681, 982, 481, 6});

assertPartitionsWithConstChannel(values, 1);
assertPartitionsWithConstChannel(values, 2);
assertPartitionsWithConstChannel(values, 500);
assertPartitionsWithConstChannel(values, 997);

values = makeNullableFlatVector<int128_t>(
{HugeInt::parse("1234567890123456789112345678"),
DecimalUtil::kLongDecimalMin,
DecimalUtil::kLongDecimalMax},
DECIMAL(38, 0));
assertPartitions(values, 997, {51, 835, 645});
}

TEST_F(HivePartitionFunctionTest, varchar) {
auto values = makeNullableFlatVector<std::string>(
{std::nullopt,
Expand Down
21 changes: 5 additions & 16 deletions velox/connectors/hive/tests/HivePartitionUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ TEST_F(HivePartitionUtilTest, partitionName) {
"flat_bigint_col",
"dict_string_col",
"const_date_col",
"flat_timestamp_col",
"short_decimal_col",
"long_decimal_col"},
"flat_timestamp_col"},
{makeFlatVector<bool>(std::vector<bool>{false}),
makeFlatVector<int8_t>(std::vector<int8_t>{10}),
makeFlatVector<int16_t>(std::vector<int16_t>{100}),
Expand All @@ -85,10 +83,7 @@ TEST_F(HivePartitionUtilTest, partitionName) {
makeDictionary<StringView>(std::vector<StringView>{"str1000"}),
makeConstant<int32_t>(10000, 1, DATE()),
makeFlatVector<Timestamp>(
std::vector<Timestamp>{Timestamp::fromMillis(1577836800000)}),
makeConstant<int64_t>(10000, 1, DECIMAL(12, 2)),
makeConstant<int128_t>(
DecimalUtil::kLongDecimalMin / 100, 1, DECIMAL(38, 2))});
std::vector<Timestamp>{Timestamp::fromMillis(1577836800000)})});

std::vector<std::string> expectedPartitionKeyValues{
"flat_bool_col=false",
Expand All @@ -98,9 +93,7 @@ TEST_F(HivePartitionUtilTest, partitionName) {
"flat_bigint_col=10000",
"dict_string_col=str1000",
"const_date_col=1997-05-19",
"flat_timestamp_col=2019-12-31 16%3A00%3A00.0",
"short_decimal_col=100.00",
"long_decimal_col=-" + std::string(34, '9') + ".99"};
"flat_timestamp_col=2019-12-31 16%3A00%3A00.0"};

std::vector<column_index_t> partitionChannels;
for (auto i = 1; i <= expectedPartitionKeyValues.size(); i++) {
Expand Down Expand Up @@ -147,9 +140,7 @@ TEST_F(HivePartitionUtilTest, partitionNameForNull) {
"flat_bigint_col",
"flat_string_col",
"const_date_col",
"flat_timestamp_col",
"short_decimal_col",
"long_decimal_col"};
"flat_timestamp_col"};

RowVectorPtr input = makeRowVector(
partitionColumnNames,
Expand All @@ -160,9 +151,7 @@ TEST_F(HivePartitionUtilTest, partitionNameForNull) {
makeNullableFlatVector<int64_t>({std::nullopt}),
makeNullableFlatVector<StringView>({std::nullopt}),
makeConstant<int32_t>(std::nullopt, 1, DATE()),
makeNullableFlatVector<Timestamp>({std::nullopt}),
makeConstant<int64_t>(std::nullopt, 1, DECIMAL(12, 2)),
makeConstant<int128_t>(std::nullopt, 1, DECIMAL(38, 2))});
makeNullableFlatVector<Timestamp>({std::nullopt})});

for (auto i = 0; i < partitionColumnNames.size(); i++) {
std::vector<column_index_t> partitionChannels = {(column_index_t)i};
Expand Down
Loading
Loading