Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -336,31 +336,32 @@ HivePrestoToVeloxConnector::toVeloxTableHandle(
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments) const {
auto addSynthesizedColumn = [&](const std::string& name,
protocol::hive::ColumnType columnType,
const protocol::ColumnHandle& column) {
if (toHiveColumnType(columnType) ==
velox::connector::hive::HiveColumnHandle::ColumnType::kSynthesized) {
if (assignments.count(name) == 0) {
assignments.emplace(name, toVeloxColumnHandle(&column, typeParser));
}
}
};
const velox::connector::ColumnHandleMap& assignments) const {
auto hiveLayout =
std::dynamic_pointer_cast<const protocol::hive::HiveTableLayoutHandle>(
tableHandle.connectorTableLayout);
VELOX_CHECK_NOT_NULL(
hiveLayout,
"Unexpected layout type {}",
tableHandle.connectorTableLayout->_type);

std::unordered_set<std::string> columnNames;
std::vector<velox::connector::hive::HiveColumnHandlePtr> columnHandles;
for (const auto& entry : hiveLayout->partitionColumns) {
assignments.emplace(entry.name, toVeloxColumnHandle(&entry, typeParser));
if (columnNames.emplace(entry.name).second) {
columnHandles.emplace_back(
std::dynamic_pointer_cast<velox::connector::hive::HiveColumnHandle>(
std::shared_ptr(toVeloxColumnHandle(&entry, typeParser))));
}
}

// Add synthesized columns to the TableScanNode columnHandles as well.
for (const auto& entry : hiveLayout->predicateColumns) {
addSynthesizedColumn(entry.first, entry.second.columnType, entry.second);
if (columnNames.emplace(entry.second.name).second) {
columnHandles.emplace_back(
std::dynamic_pointer_cast<velox::connector::hive::HiveColumnHandle>(
std::shared_ptr(toVeloxColumnHandle(&entry.second, typeParser))));
}
}

auto hiveTableHandle =
Expand All @@ -384,6 +385,7 @@ HivePrestoToVeloxConnector::toVeloxTableHandle(
tableName,
hiveLayout->dataColumns,
tableHandle,
columnHandles,
hiveLayout->tableParameters,
exprConverter,
typeParser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class HivePrestoToVeloxConnector final : public PrestoToVeloxConnector {
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments) const final;
const velox::connector::ColumnHandleMap& assignments) const final;

std::unique_ptr<velox::connector::ConnectorInsertTableHandle>
toVeloxInsertTableHandle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ std::unique_ptr<velox::connector::ConnectorTableHandle> toIcebergTableHandle(
const std::string& tableName,
const protocol::List<protocol::Column>& dataColumns,
const protocol::TableHandle& tableHandle,
const protocol::Map<protocol::String, protocol::String>& tableParameters,
const std::vector<velox::connector::hive::HiveColumnHandlePtr>&
columnHandles,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser) {
velox::common::SubfieldFilters subfieldFilters;
Expand Down Expand Up @@ -96,22 +97,15 @@ std::unique_ptr<velox::connector::ConnectorTableHandle> toIcebergTableHandle(
finalDataColumns = ROW(std::move(names), std::move(types));
}

std::unordered_map<std::string, std::string> finalTableParameters = {};
if (!tableParameters.empty()) {
finalTableParameters.reserve(tableParameters.size());
for (const auto& [key, value] : tableParameters) {
finalTableParameters[key] = value;
}
}

return std::make_unique<velox::connector::hive::HiveTableHandle>(
tableHandle.connectorId,
tableName,
isPushdownFilterEnabled,
std::move(subfieldFilters),
remainingFilter,
finalDataColumns,
finalTableParameters);
std::unordered_map<std::string, std::string>{},
columnHandles);
}

} // namespace
Expand Down Expand Up @@ -212,18 +206,7 @@ IcebergPrestoToVeloxConnector::toVeloxTableHandle(
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments) const {
auto addSynthesizedColumn = [&](const std::string& name,
protocol::hive::ColumnType columnType,
const protocol::ColumnHandle& column) {
if (toHiveColumnType(columnType) ==
velox::connector::hive::HiveColumnHandle::ColumnType::kSynthesized) {
if (assignments.count(name) == 0) {
assignments.emplace(name, toVeloxColumnHandle(&column, typeParser));
}
}
};

const velox::connector::ColumnHandleMap& assignments) const {
auto icebergLayout = std::dynamic_pointer_cast<
const protocol::iceberg::IcebergTableLayoutHandle>(
tableHandle.connectorTableLayout);
Expand All @@ -232,14 +215,25 @@ IcebergPrestoToVeloxConnector::toVeloxTableHandle(
"Unexpected layout type {}",
tableHandle.connectorTableLayout->_type);

std::unordered_set<std::string> columnNames;
std::vector<velox::connector::hive::HiveColumnHandlePtr> columnHandles;
for (const auto& entry : icebergLayout->partitionColumns) {
assignments.emplace(
entry.columnIdentity.name, toVeloxColumnHandle(&entry, typeParser));
if (columnNames.emplace(entry.columnIdentity.name).second) {
columnHandles.emplace_back(
std::dynamic_pointer_cast<
const velox::connector::hive::HiveColumnHandle>(
std::shared_ptr(toVeloxColumnHandle(&entry, typeParser))));
}
}

// Add synthesized columns to the TableScanNode columnHandles as well.
for (const auto& entry : icebergLayout->predicateColumns) {
addSynthesizedColumn(entry.first, entry.second.columnType, entry.second);
if (columnNames.emplace(entry.second.columnIdentity.name).second) {
columnHandles.emplace_back(
std::dynamic_pointer_cast<
const velox::connector::hive::HiveColumnHandle>(
std::shared_ptr(toVeloxColumnHandle(&entry.second, typeParser))));
}
}

auto icebergTableHandle =
Expand All @@ -265,7 +259,7 @@ IcebergPrestoToVeloxConnector::toVeloxTableHandle(
tableName,
icebergLayout->dataColumns,
tableHandle,
{},
columnHandles,
exprConverter,
typeParser);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector {
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments) const final;
const velox::connector::ColumnHandleMap& assignments) const final;

std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
const final;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ TpchPrestoToVeloxConnector::toVeloxTableHandle(
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments) const {
const velox::connector::ColumnHandleMap& assignments) const {
auto tpchLayout =
std::dynamic_pointer_cast<const protocol::tpch::TpchTableLayoutHandle>(
tableHandle.connectorTableLayout);
Expand Down Expand Up @@ -154,7 +154,7 @@ TpcdsPrestoToVeloxConnector::toVeloxTableHandle(
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments) const {
const velox::connector::ColumnHandleMap& assignments) const {
auto tpcdsLayout =
std::dynamic_pointer_cast<const protocol::tpcds::TpcdsTableLayoutHandle>(
tableHandle.connectorTableLayout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class PrestoToVeloxConnector {
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments) const = 0;
const velox::connector::ColumnHandleMap& assignments) const = 0;

[[nodiscard]] virtual std::unique_ptr<
velox::connector::ConnectorInsertTableHandle>
Expand Down Expand Up @@ -133,7 +133,7 @@ class TpchPrestoToVeloxConnector final : public PrestoToVeloxConnector {
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments) const final;
const velox::connector::ColumnHandleMap& assignments) const final;

std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
const final;
Expand All @@ -157,7 +157,7 @@ class TpcdsPrestoToVeloxConnector final : public PrestoToVeloxConnector {
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments) const final;
const velox::connector::ColumnHandleMap& assignments) const final;

std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
const final;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,8 @@ std::unique_ptr<velox::connector::ConnectorTableHandle> toHiveTableHandle(
const std::string& tableName,
const protocol::List<protocol::Column>& dataColumns,
const protocol::TableHandle& tableHandle,
const std::vector<velox::connector::hive::HiveColumnHandlePtr>&
columnHandles,
const protocol::Map<protocol::String, protocol::String>& tableParameters,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser) {
Expand Down Expand Up @@ -788,16 +790,6 @@ std::unique_ptr<velox::connector::ConnectorTableHandle> toHiveTableHandle(
finalDataColumns = ROW(std::move(names), std::move(types));
}

if (tableParameters.empty()) {
return std::make_unique<connector::hive::HiveTableHandle>(
tableHandle.connectorId,
tableName,
isPushdownFilterEnabled,
std::move(subfieldFilters),
remainingFilter,
finalDataColumns);
}

std::unordered_map<std::string, std::string> finalTableParameters = {};
finalTableParameters.reserve(tableParameters.size());
for (const auto& [key, value] : tableParameters) {
Expand All @@ -811,7 +803,8 @@ std::unique_ptr<velox::connector::ConnectorTableHandle> toHiveTableHandle(
std::move(subfieldFilters),
remainingFilter,
finalDataColumns,
finalTableParameters);
finalTableParameters,
columnHandles);
}

} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ std::unique_ptr<velox::connector::ConnectorTableHandle> toHiveTableHandle(
const std::string& tableName,
const protocol::List<protocol::Column>& dataColumns,
const protocol::TableHandle& tableHandle,
const std::vector<velox::connector::hive::HiveColumnHandlePtr>&
columnHandles,
const protocol::Map<protocol::String, protocol::String>& tableParameters,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ SystemPrestoToVeloxConnector::toVeloxTableHandle(
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments) const {
const velox::connector::ColumnHandleMap& assignments) const {
auto systemLayout =
std::dynamic_pointer_cast<const protocol::SystemTableLayoutHandle>(
tableHandle.connectorTableLayout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class SystemPrestoToVeloxConnector final : public PrestoToVeloxConnector {
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments) const final;
const velox::connector::ColumnHandleMap& assignments) const final;

std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
const final;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ArrowPrestoToVeloxConnector::toVeloxTableHandle(
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& /*exprConverter*/,
const TypeParser& /*typeParser*/,
velox::connector::ColumnHandleMap& assignments) const {
const velox::connector::ColumnHandleMap& /*assignments*/) const {
return std::make_unique<presto::ArrowFlightTableHandle>(
tableHandle.connectorId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ArrowPrestoToVeloxConnector final : public PrestoToVeloxConnector {
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
velox::connector::ColumnHandleMap& assignments) const final;
const velox::connector::ColumnHandleMap& assignments) const final;

std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
const final;
Expand Down
Loading