Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
std::shared_ptr<std::string> extraFileInfo;
std::unordered_map<std::string, std::string> serdeParameters;

/// These represent columns like $file_size, $file_modified_time that are
/// associated with the HiveSplit.
std::unordered_map<std::string, std::string> infoColumns;

HiveConnectorSplit(
const std::string& connectorId,
const std::string& _filePath,
Expand All @@ -51,7 +55,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
const std::unordered_map<std::string, std::string>& _customSplitInfo = {},
const std::shared_ptr<std::string>& _extraFileInfo = {},
const std::unordered_map<std::string, std::string>& _serdeParameters = {},
int64_t _splitWeight = 0)
int64_t _splitWeight = 0,
const std::unordered_map<std::string, std::string>& _infoColumns = {})
: ConnectorSplit(connectorId, _splitWeight),
filePath(_filePath),
fileFormat(_fileFormat),
Expand All @@ -61,7 +66,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
tableBucketNumber(_tableBucketNumber),
customSplitInfo(_customSplitInfo),
extraFileInfo(_extraFileInfo),
serdeParameters(_serdeParameters) {}
serdeParameters(_serdeParameters),
infoColumns(_infoColumns) {}

std::string toString() const override {
if (tableBucketNumber.has_value()) {
Expand Down
28 changes: 22 additions & 6 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ inline uint8_t parseDelimiter(const std::string& delim) {
return stoi(delim);
}

inline bool isSynthesizedColumn(
const std::string& name,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns) {
return name == kPath || name == kBucket || infoColumns.count(name) != 0;
}

} // namespace

const std::string& getColumnName(const common::Subfield& subfield) {
Expand Down Expand Up @@ -273,9 +280,13 @@ void checkColumnNameLowerCase(const std::shared_ptr<const Type>& type) {
}
}

void checkColumnNameLowerCase(const SubfieldFilters& filters) {
void checkColumnNameLowerCase(
const SubfieldFilters& filters,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns) {
for (auto& pair : filters) {
if (auto name = pair.first.toString(); name == kPath || name == kBucket) {
if (auto name = pair.first.toString();
isSynthesizedColumn(name, infoColumns)) {
continue;
}
auto& path = pair.first.path();
Expand Down Expand Up @@ -310,14 +321,17 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
const RowTypePtr& dataColumns,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
memory::MemoryPool* pool) {
auto spec = std::make_shared<common::ScanSpec>("root");
folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
filterSubfields;
std::vector<SubfieldSpec> subfieldSpecs;
for (auto& [subfield, _] : filters) {
if (auto name = subfield.toString();
name != kPath && name != kBucket && partitionKeys.count(name) == 0) {
!isSynthesizedColumn(name, infoColumns) &&
partitionKeys.count(name) == 0) {
filterSubfields[getColumnName(subfield)].push_back(&subfield);
}
}
Expand Down Expand Up @@ -364,11 +378,13 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
// SelectiveColumnReader doesn't support constant columns with filters,
// hence, we can't have a filter for a $path or $bucket column.
//
// Unfortunately, Presto happens to specify a filter for $path or
// $bucket column. This filter is redundant and needs to be removed.
// Unfortunately, Presto happens to specify a filter for $path, $file_size,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if there is there an issue for this on Presto side?

// $file_modified_time or $bucket column. This filter is redundant and needs
// to be removed.
// TODO Remove this check when Presto is fixed to not specify a filter
// on $path and $bucket column.
if (auto name = pair.first.toString(); name == kPath || name == kBucket) {
if (auto name = pair.first.toString();
isSynthesizedColumn(name, infoColumns)) {
continue;
}
auto fieldSpec = spec->getOrCreateChild(pair.first);
Expand Down
7 changes: 6 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ const std::string& getColumnName(const common::Subfield& subfield);

void checkColumnNameLowerCase(const std::shared_ptr<const Type>& type);

void checkColumnNameLowerCase(const SubfieldFilters& filters);
void checkColumnNameLowerCase(
const SubfieldFilters& filters,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns);

void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr);

Expand All @@ -52,6 +55,8 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
const RowTypePtr& dataColumns,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
memory::MemoryPool* pool);

void configureReaderOptions(
Expand Down
7 changes: 6 additions & 1 deletion velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ HiveDataSource::HiveDataSource(
if (handle->columnType() == HiveColumnHandle::ColumnType::kPartitionKey) {
partitionKeys_.emplace(handle->name(), handle);
}

if (handle->columnType() == HiveColumnHandle::ColumnType::kSynthesized) {
infoColumns_.emplace(handle->name(), handle);
}
}

std::vector<std::string> readerRowNames;
Expand Down Expand Up @@ -88,7 +92,7 @@ HiveDataSource::HiveDataSource(
if (hiveConfig_->isFileColumnNamesReadAsLowerCase(
connectorQueryCtx->sessionProperties())) {
checkColumnNameLowerCase(outputType_);
checkColumnNameLowerCase(hiveTableHandle_->subfieldFilters());
checkColumnNameLowerCase(hiveTableHandle_->subfieldFilters(), infoColumns_);
checkColumnNameLowerCase(hiveTableHandle_->remainingFilter());
}

Expand Down Expand Up @@ -149,6 +153,7 @@ HiveDataSource::HiveDataSource(
filters,
hiveTableHandle_->dataColumns(),
partitionKeys_,
infoColumns_,
pool_);
if (remainingFilter) {
metadataFilter_ = std::make_shared<common::MetadataFilter>(
Expand Down
4 changes: 4 additions & 0 deletions velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ class HiveDataSource : public DataSource {

// The row type for the data source output, not including filter-only columns
const RowTypePtr outputType_;

// Column handles for the Split info columns keyed on their column names.
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>
infoColumns_;
std::shared_ptr<common::MetadataFilter> metadataFilter_;
std::unique_ptr<exec::ExprSet> remainingFilterExprSet_;
RowVectorPtr emptyOutput_;
Expand Down
18 changes: 15 additions & 3 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ std::vector<TypePtr> SplitReader::adaptColumns(
auto* childSpec = childrenSpecs[i].get();
const std::string& fieldName = childSpec->fieldName();

auto iter = hiveSplit_->partitionKeys.find(fieldName);
if (iter != hiveSplit_->partitionKeys.end()) {
setPartitionValue(childSpec, fieldName, iter->second);
if (auto it = hiveSplit_->partitionKeys.find(fieldName);
it != hiveSplit_->partitionKeys.end()) {
setPartitionValue(childSpec, fieldName, it->second);
} else if (fieldName == kPath) {
auto constantVec = std::make_shared<ConstantVector<StringView>>(
connectorQueryCtx_->memoryPool(),
Expand All @@ -240,6 +240,18 @@ std::vector<TypePtr> SplitReader::adaptColumns(
std::move(bucket));
childSpec->setConstantValue(constantVec);
}
} else if (auto iter = hiveSplit_->infoColumns.find(fieldName);
iter != hiveSplit_->infoColumns.end()) {
auto infoColumnType =
readerOutputType_->childAt(readerOutputType_->getChildIdx(fieldName));
auto constant = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
newConstantFromString,
infoColumnType->kind(),
infoColumnType,
iter->second,
1,
connectorQueryCtx_->memoryPool());
childSpec->setConstantValue(constant);
} else {
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
if (!fileTypeIdx.has_value()) {
Expand Down
18 changes: 14 additions & 4 deletions velox/connectors/hive/iceberg/IcebergSplit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,21 @@ HiveIcebergSplit::HiveIcebergSplit(
_partitionKeys,
std::optional<int32_t> _tableBucketNumber,
const std::unordered_map<std::string, std::string>& _customSplitInfo,
const std::shared_ptr<std::string>& _extraFileInfo)
const std::shared_ptr<std::string>& _extraFileInfo,
const std::unordered_map<std::string, std::string>& _infoColumns)
: HiveConnectorSplit(
_connectorId,
_filePath,
_fileFormat,
_start,
_length,
_partitionKeys,
_tableBucketNumber) {
_tableBucketNumber,
_customSplitInfo,
_extraFileInfo,
{},
0,
_infoColumns) {
Comment thread
aditi-pandit marked this conversation as resolved.
Outdated
// TODO: Deserialize _extraFileInfo to get deleteFiles;
}

Expand All @@ -54,7 +60,8 @@ HiveIcebergSplit::HiveIcebergSplit(
std::optional<int32_t> _tableBucketNumber,
const std::unordered_map<std::string, std::string>& _customSplitInfo,
const std::shared_ptr<std::string>& _extraFileInfo,
std::vector<IcebergDeleteFile> _deletes)
std::vector<IcebergDeleteFile> _deletes,
const std::unordered_map<std::string, std::string>& _infoColumns)
: HiveConnectorSplit(
_connectorId,
_filePath,
Expand All @@ -64,6 +71,9 @@ HiveIcebergSplit::HiveIcebergSplit(
_partitionKeys,
_tableBucketNumber,
_customSplitInfo,
_extraFileInfo),
_extraFileInfo,
{},
0,
_infoColumns),
deleteFiles(_deletes) {}
} // namespace facebook::velox::connector::hive::iceberg
6 changes: 4 additions & 2 deletions velox/connectors/hive/iceberg/IcebergSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit {
_partitionKeys = {},
std::optional<int32_t> _tableBucketNumber = std::nullopt,
const std::unordered_map<std::string, std::string>& _customSplitInfo = {},
const std::shared_ptr<std::string>& _extraFileInfo = {});
const std::shared_ptr<std::string>& _extraFileInfo = {},
const std::unordered_map<std::string, std::string>& _infoColumns = {});

// For tests only
HiveIcebergSplit(
Expand All @@ -50,7 +51,8 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit {
std::optional<int32_t> _tableBucketNumber = std::nullopt,
const std::unordered_map<std::string, std::string>& _customSplitInfo = {},
const std::shared_ptr<std::string>& _extraFileInfo = {},
std::vector<IcebergDeleteFile> deletes = {});
std::vector<IcebergDeleteFile> deletes = {},
const std::unordered_map<std::string, std::string>& _infoColumns = {});
};

} // namespace facebook::velox::connector::hive::iceberg
17 changes: 14 additions & 3 deletions velox/connectors/hive/tests/HiveConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_multilevel) {
auto rowType = ROW({{"c0", columnType}});
auto subfields = makeSubfields({"c0.c0c1[3][\"foo\"].c0c1c0"});
auto scanSpec = makeScanSpec(
rowType, groupSubfields(subfields), {}, nullptr, {}, pool_.get());
rowType, groupSubfields(subfields), {}, nullptr, {}, {}, pool_.get());
auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0");
validateNullConstant(*c0c0, *BIGINT());
auto* c0c1 = scanSpec->childByName("c0")->childByName("c0c1");
Expand Down Expand Up @@ -122,6 +122,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeFields) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0");
ASSERT_FALSE(c0c0->childByName("c0c0c0")->isConstant());
Expand All @@ -144,6 +145,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArray) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_EQ(c0->maxArrayElementsCount(), 2);
Expand All @@ -160,7 +162,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArrayNegative) {
auto subfields = makeSubfields({"c0[1].c0c0", "c0[-1].c0c2"});
auto groupedSubfields = groupSubfields(subfields);
VELOX_ASSERT_USER_THROW(
makeScanSpec(rowType, groupedSubfields, {}, nullptr, {}, pool_.get()),
makeScanSpec(rowType, groupedSubfields, {}, nullptr, {}, {}, pool_.get()),
"Non-positive array subscript cannot be push down");
}

Expand All @@ -175,6 +177,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeMap) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
auto* keysFilter = c0->childByName(ScanSpec::kMapKeysFieldName)->filter();
Expand All @@ -200,6 +203,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter());
Expand All @@ -218,6 +222,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter());
Expand All @@ -240,6 +245,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
auto* keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand Down Expand Up @@ -267,6 +273,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand All @@ -285,6 +292,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand All @@ -300,6 +308,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand Down Expand Up @@ -335,6 +344,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filtersNotInRequiredSubfields) {
filters,
ROW({{"c0", c0Type}, {"c1", c1Type}}),
{},
{},
pool_.get());
auto c0 = scanSpec->childByName("c0");
ASSERT_FALSE(c0->isConstant());
Expand Down Expand Up @@ -379,6 +389,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_duplicateSubfields) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_EQ(c0->children().size(), 2);
Expand All @@ -392,7 +403,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filterPartitionKey) {
SubfieldFilters filters;
filters.emplace(Subfield("ds"), exec::equal("2023-10-13"));
auto scanSpec = makeScanSpec(
rowType, {}, filters, rowType, {{"ds", nullptr}}, pool_.get());
rowType, {}, filters, rowType, {{"ds", nullptr}}, {}, pool_.get());
ASSERT_TRUE(scanSpec->childByName("c0")->projectOut());
ASSERT_FALSE(scanSpec->childByName("ds")->projectOut());
}
Expand Down
Loading