Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ set(
)
option(VELOX_ENABLE_EXEC "Build exec." ON)
option(VELOX_ENABLE_AGGREGATES "Build aggregates." ON)
option(VELOX_ENABLE_HIVE_CONNECTOR "Build Hive connector." ON)
option(VELOX_ENABLE_HIVE_CONNECTOR "Build the Hive connector." ON)
#option(VELOX_ENABLE_HIVE_NEW_CONNECTOR "Build the new Hive connector." ON)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the unused lines

#option(VELOX_ENABLE_ICEBERG_CONNECTOR "Build the ICEBERG connector that does NOT depend on the new Hive connector." ON)
option(VELOX_ENABLE_TPCH_CONNECTOR "Build TPC-H connector." ON)
option(VELOX_ENABLE_TPCDS_CONNECTOR "Build TPC-DS connector." ON)
option(VELOX_ENABLE_PRESTO_FUNCTIONS "Build Presto SQL functions." ON)
Expand Down Expand Up @@ -722,6 +724,7 @@ include_directories(.)

# Adding this down here prevents warnings in dependencies from stopping the
# build
set(TREAT_WARNINGS_AS_ERRORS OFF)
if("${TREAT_WARNINGS_AS_ERRORS}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror")
endif()
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ velox_link_libraries(velox_connector velox_common_config velox_vector)

add_subdirectory(fuzzer)

add_subdirectory(lakehouse)

if(${VELOX_ENABLE_HIVE_CONNECTOR})
add_subdirectory(hive)
endif()
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class DataSink {
uint64_t recodeTimeNs{0};
uint64_t compressionTimeNs{0};

common::SpillStats spillStats;
velox::common::SpillStats spillStats;

bool empty() const;

Expand Down
17 changes: 17 additions & 0 deletions velox/connectors/lakehouse/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#add_subdirectory(storage_adapters)
add_subdirectory(iceberg)

59 changes: 59 additions & 0 deletions velox/connectors/lakehouse/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

velox_add_library(
velox_lakehouse_iceberg_connector
ConnectorConfigBase.cpp
DataSourceBase.cpp
ConnectorSplitBase.cpp
ConnectorUtil.cpp
FileHandle.cpp
SplitReaderBase.cpp
TableHandleBase.cpp
IcebergConfig.cpp
IcebergConnector.cpp
IcebergConnectorSplit.cpp
IcebergConnectorUtil.cpp
IcebergPartitionUtil.cpp
PartitionIdGenerator.cpp
IcebergDataSource.cpp
IcebergPartitionFunction.cpp
IcebergSplitReader.cpp
IcebergTableHandle.cpp
PositionalDeleteFileReader.cpp
)

velox_link_libraries(
velox_lakehouse_iceberg_connector
PRIVATE velox_connector
velox_common_io
velox_dwio_catalog_fbhive
velox_buffer
velox_caching
velox_common_compression
velox_common_config
velox_dwio_common_encryption
velox_dwio_common_exception
velox_exception
velox_expression
velox_memory
velox_type_tz
Boost::regex
Folly::folly
glog::glog
protobuf::libprotobuf)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()
151 changes: 151 additions & 0 deletions velox/connectors/lakehouse/iceberg/ConnectorConfigBase.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file names should not be ConnectorXXX any more. They need to be called IcebergXXX

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's already in the final leaf folder, there should not be XXXBase classes or files anymore. These Base classes shall be merged with the final classes, e.g. ConnectorConfigBase shall be merged into IcebergConfig

#include "ConnectorConfigBase.h"

namespace facebook::velox::connector::lakehouse::iceberg {

std::string ConnectorConfigBase::gcsEndpoint() const {
return config_->get<std::string>(kGcsEndpoint, std::string(""));
}

std::string ConnectorConfigBase::gcsCredentialsPath() const {
return config_->get<std::string>(kGcsCredentialsPath, std::string(""));
}

std::optional<int> ConnectorConfigBase::gcsMaxRetryCount() const {
return static_cast<std::optional<int>>(config_->get<int>(kGcsMaxRetryCount));
}

std::optional<std::string> ConnectorConfigBase::gcsMaxRetryTime() const {
return static_cast<std::optional<std::string>>(
config_->get<std::string>(kGcsMaxRetryTime));
}

bool ConnectorConfigBase::isOrcUseColumnNames(
const config::ConfigBase* session) const {
return session->get<bool>(
kOrcUseColumnNamesSession, config_->get<bool>(kOrcUseColumnNames, false));
}

bool ConnectorConfigBase::isParquetUseColumnNames(
const config::ConfigBase* session) const {
return session->get<bool>(
kParquetUseColumnNamesSession,
config_->get<bool>(kParquetUseColumnNames, false));
}

bool ConnectorConfigBase::isFileColumnNamesReadAsLowerCase(
const config::ConfigBase* session) const {
return session->get<bool>(
kFileColumnNamesReadAsLowerCaseSession,
config_->get<bool>(kFileColumnNamesReadAsLowerCase, false));
}

bool ConnectorConfigBase::isPartitionPathAsLowerCase(
const config::ConfigBase* session) const {
return session->get<bool>(kPartitionPathAsLowerCaseSession, true);
}

bool ConnectorConfigBase::allowNullPartitionKeys(
const config::ConfigBase* session) const {
return session->get<bool>(
kAllowNullPartitionKeysSession,
config_->get<bool>(kAllowNullPartitionKeys, true));
}

int64_t ConnectorConfigBase::maxCoalescedBytes(
const config::ConfigBase* session) const {
return session->get<int64_t>(
kMaxCoalescedBytesSession,
config_->get<int64_t>(kMaxCoalescedBytes, 128 << 20)); // 128MB
}

int32_t ConnectorConfigBase::maxCoalescedDistanceBytes(
const config::ConfigBase* session) const {
const auto distance = config::toCapacity(
session->get<std::string>(
kMaxCoalescedDistanceSession,
config_->get<std::string>(kMaxCoalescedDistance, "512kB")),
config::CapacityUnit::BYTE);
VELOX_USER_CHECK_LE(
distance,
std::numeric_limits<int32_t>::max(),
"The max merge distance to combine read requests must be less than 2GB."
" Got {} bytes.",
distance);
return int32_t(distance);
}

int32_t ConnectorConfigBase::prefetchRowGroups() const {
return config_->get<int32_t>(kPrefetchRowGroups, 1);
}

int32_t ConnectorConfigBase::loadQuantum(const config::ConfigBase* session) const {
return session->get<int32_t>(
kLoadQuantumSession, config_->get<int32_t>(kLoadQuantum, 8 << 20));
}

int32_t ConnectorConfigBase::numCacheFileHandles() const {
return config_->get<int32_t>(kNumCacheFileHandles, 20'000);
}

uint64_t ConnectorConfigBase::fileHandleExpirationDurationMs() const {
return config_->get<uint64_t>(kFileHandleExpirationDurationMs, 0);
}

bool ConnectorConfigBase::isFileHandleCacheEnabled() const {
return config_->get<bool>(kEnableFileHandleCache, true);
}

std::string ConnectorConfigBase::writeFileCreateConfig() const {
return config_->get<std::string>(kWriteFileCreateConfig, "");
}

uint64_t ConnectorConfigBase::footerEstimatedSize() const {
return config_->get<uint64_t>(kFooterEstimatedSize, 256UL << 10);
}

uint64_t ConnectorConfigBase::filePreloadThreshold() const {
return config_->get<uint64_t>(kFilePreloadThreshold, 8UL << 20);
}

uint8_t ConnectorConfigBase::readTimestampUnit(
const config::ConfigBase* session) const {
const auto unit = session->get<uint8_t>(
kReadTimestampUnitSession,
config_->get<uint8_t>(kReadTimestampUnit, 3 /*milli*/));
VELOX_CHECK(
unit == 3 || unit == 6 /*micro*/ || unit == 9 /*nano*/,
"Invalid timestamp unit.");
return unit;
}

bool ConnectorConfigBase::readTimestampPartitionValueAsLocalTime(
const config::ConfigBase* session) const {
return session->get<bool>(
kReadTimestampPartitionValueAsLocalTimeSession,
config_->get<bool>(kReadTimestampPartitionValueAsLocalTime, true));
}

bool ConnectorConfigBase::readStatsBasedFilterReorderDisabled(
const config::ConfigBase* session) const {
return session->get<bool>(
kReadStatsBasedFilterReorderDisabledSession,
config_->get<bool>(kReadStatsBasedFilterReorderDisabled, false));
}

} // namespace facebook::velox::connector::lakehouse::iceberg
Loading