Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion velox/connectors/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
add_library(velox_connector Connector.cpp)
add_library(velox_connector Connector.cpp WriteProtocol.cpp)

target_link_libraries(velox_connector velox_config velox_vector)

Expand Down
41 changes: 27 additions & 14 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Filter;
}
namespace facebook::velox::core {
class ITypedExpr;
}
} // namespace facebook::velox::core
namespace facebook::velox::exec {
class ExprSet;
}
Expand Down Expand Up @@ -171,21 +171,25 @@ class ConnectorQueryCtx {
public:
ConnectorQueryCtx(
memory::MemoryPool* pool,
Config* config,
const Config* FOLLY_NONNULL connectorConfig,
ExpressionEvaluator* expressionEvaluator,
memory::MappedMemory* mappedMemory,
const std::string& scanId)
memory::MappedMemory* FOLLY_NONNULL mappedMemory,
const std::string& taskId,
const std::string& planNodeId,
int driverId)
: pool_(pool),
config_(config),
config_(connectorConfig),
expressionEvaluator_(expressionEvaluator),
mappedMemory_(mappedMemory),
scanId_(scanId) {}
scanId_(fmt::format("{}.{}", taskId, planNodeId)),
taskId_(taskId),
driverId_(driverId) {}

memory::MemoryPool* memoryPool() const {
return pool_;
}

Config* config() const {
const Config* config() const {
return config_;
}

Expand All @@ -199,21 +203,30 @@ class ConnectorQueryCtx {
return mappedMemory_;
}

// Returns an id that allows sharing state between different threads
// of the same scan. This is typically a query id plus the scan's
// PlanNodeId. This is used for locating a scanTracker, which tracks
// the read density of columns for prefetch and other memory
// hierarchy purposes.
// This is a combination of task id and the scan's PlanNodeId. This is an id
// that allows sharing state between different threads of the same scan. This
// is used for locating a scanTracker, which tracks the read density of
// columns for prefetch and other memory hierarchy purposes.
const std::string& scanId() const {
return scanId_;
}

const std::string& taskId() const {
return taskId_;
}

int driverId() const {
return driverId_;
}

private:
memory::MemoryPool* pool_;
Config* config_;
const Config* config_;
ExpressionEvaluator* expressionEvaluator_;
memory::MappedMemory* mappedMemory_;
std::string scanId_;
const std::string scanId_;
const std::string taskId_;
const int driverId_;
};

class Connector {
Expand Down
74 changes: 74 additions & 0 deletions velox/connectors/WriteProtocol.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/WriteProtocol.h"
#include "velox/connectors/Connector.h"
#include "velox/vector/ConstantVector.h"

#include <unordered_map>

namespace facebook::velox::connector {

namespace {

using RegisteredWriteProtocols = std::unordered_map<
WriteProtocol::CommitStrategy,
std::shared_ptr<WriteProtocol>>;

RegisteredWriteProtocols& registeredWriteProtocols() {
// Default write protocol registered
static RegisteredWriteProtocols protocols{
{WriteProtocol::CommitStrategy::kNoCommit,
std::make_shared<DefaultWriteProtocol>()}};
return protocols;
}

} // namespace

// static
bool WriteProtocol::registerWriteProtocol(
WriteProtocol::CommitStrategy commitStrategy,
std::shared_ptr<WriteProtocol> writeProtocol) {
return registeredWriteProtocols()
.insert_or_assign(commitStrategy, writeProtocol)
.second;
}

// static
std::shared_ptr<WriteProtocol> WriteProtocol::getWriteProtocol(
CommitStrategy commitStrategy) {
const auto iter = registeredWriteProtocols().find(commitStrategy);
// Fail if no WriteProtocol has been registered for the given CommitStrategy.
VELOX_CHECK(
iter != registeredWriteProtocols().end(),
"No write protocol found for commit strategy {}",
commitStrategyToString(commitStrategy));
return iter->second;
}

RowVectorPtr DefaultWriteProtocol::commit(
const WriteInfo& writeInfo,
velox::memory::MemoryPool* FOLLY_NONNULL pool) {
return std::make_shared<RowVector>(
pool,
ROW({"rowCount"}, {BIGINT()}),
BufferPtr(nullptr),
1,
std::vector<VectorPtr>{std::make_shared<ConstantVector<int64_t>>(
pool, 1, false, BIGINT(), writeInfo.numWrittenRows())});
}

} // namespace facebook::velox::connector
123 changes: 123 additions & 0 deletions velox/connectors/WriteProtocol.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/vector/ComplexVector.h"

namespace facebook::velox::connector {

class ConnectorInsertTableHandle;
class ConnectorQueryCtx;

/// Interface to provide key parameters for writers. Ex., write and commit
/// locations, append or overwrite, etc.
class WriterParameters {
public:
virtual ~WriterParameters() = default;
};

/// Interface that includes all info from write. It will be passed to commit()
/// of WriteProtocol.
class WriteInfo {
public:
virtual ~WriteInfo() = default;

virtual vector_size_t numWrittenRows() const = 0;
};

/// Abstraction for write behaviors. Systems register WriteProtocols
/// by CommitStrategy. Writers call getWriteProtocol() to get the registered
/// instance of the WriteProtocol when needed.
class WriteProtocol {
public:
/// Represents the commit strategy of a write protocol.
enum class CommitStrategy {
kNoCommit, // No more commit actions are needed.
kTaskCommit // Task level commit is needed.
};

virtual ~WriteProtocol() {}

/// Return the commit strategy of the write protocol. It will be the commit
/// strategy that the write protocol registers for.
virtual CommitStrategy commitStrategy() const = 0;

/// Return a string encoding of the given commit strategy.
static std::string commitStrategyToString(CommitStrategy commitStrategy) {
switch (commitStrategy) {
case CommitStrategy::kNoCommit:
return "NO_COMMIT";
case CommitStrategy::kTaskCommit:
return "TASK_COMMIT";
default:
VELOX_UNREACHABLE();
}
}

/// Perform actions of commit. It would be called by the writers and could
/// return outputs that would be included in writer outputs. Return nullptr if
/// the commit action does not need to add output to the table writer output.
virtual RowVectorPtr commit(
const WriteInfo& writeInfo,
velox::memory::MemoryPool* FOLLY_NONNULL pool) {
return nullptr;
}

/// Return parameters for writers. Ex., write and commit locations. Return
/// nullptr if the writer does not need parameters from the write protocol.
virtual std::shared_ptr<const WriterParameters> getWriterParameters(
const std::shared_ptr<const velox::connector::ConnectorInsertTableHandle>&
tableHandle,
const velox::connector::ConnectorQueryCtx* FOLLY_NONNULL
connectorQueryCtx) const = 0;

/// Register a WriteProtocol implementation for the given CommitStrategy. If
/// the CommitStrategy has already been registered, it will replace the old
/// WriteProtocol implementation with the new one and return false; otherwise
/// return true.
static bool registerWriteProtocol(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason this API doesn't take CommitStrategy? It would be more natural to specify both CommitStrategy and WriteProtocol when registering as opposed to fetching CommitStrategy from the protocol.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, used to pass CommitStrategy to this func together wi/ the WriteProtocol. Changing it back to the more natural way.

CommitStrategy commitStrategy,
std::shared_ptr<WriteProtocol> writeProtocol);

/// Return the instance of the WriteProtocol registered for
/// the given CommitStrategy.
static std::shared_ptr<WriteProtocol> getWriteProtocol(
CommitStrategy commitStrategy);
};

class DefaultWriteProtocol : public WriteProtocol {
public:
~DefaultWriteProtocol() override {}

CommitStrategy commitStrategy() const override {
return CommitStrategy::kNoCommit;
}

RowVectorPtr commit(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to define this again. The base implementation is the same.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still see this method here. I assume it can be removed, no?

const WriteInfo& writeInfo,
velox::memory::MemoryPool* FOLLY_NONNULL pool) override;

std::shared_ptr<const WriterParameters> getWriterParameters(
const std::shared_ptr<const velox::connector::ConnectorInsertTableHandle>&
tableHandle,
const velox::connector::ConnectorQueryCtx* FOLLY_NONNULL
connectorQueryCtx) const override {
return std::make_shared<WriterParameters>();
}
};

} // namespace facebook::velox::connector
3 changes: 2 additions & 1 deletion velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

add_library(velox_hive_connector OBJECT HiveConnector.cpp FileHandle.cpp)
add_library(velox_hive_connector OBJECT HiveConnector.cpp FileHandle.cpp
HiveWriteProtocol.cpp)

target_link_libraries(velox_hive_connector velox_connector
velox_dwio_dwrf_reader velox_dwio_dwrf_writer velox_file)
Expand Down
Loading