Skip to content

Commit

Permalink
MINIFICPP-2146 Add support for SMB networking protocol
Browse files Browse the repository at this point in the history
MINIFICPP-2147 PutSmb
MINIFICPP-2148 FetchSmb
MINIFICPP-2150 ListSmb
  • Loading branch information
martinzink committed Aug 18, 2023
1 parent fbe2176 commit 8dc4bb2
Show file tree
Hide file tree
Showing 55 changed files with 2,532 additions and 764 deletions.
1 change: 1 addition & 0 deletions cmake/MiNiFiOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ if (WIN32)
add_minifi_option(MSI_REDISTRIBUTE_UCRT_NONASL "Redistribute Universal C Runtime DLLs with the MSI generated by CPack. The resulting MSI is not distributable under Apache 2.0." OFF)
add_minifi_option(ENABLE_WEL "Enables the suite of Windows Event Log extensions." OFF)
add_minifi_option(ENABLE_PDH "Enables PDH support." OFF)
add_minifi_option(ENABLE_SMB "Enables SMB support." ON)
endif()

if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ TEST_CASE("GetFile PutFile dynamic attribute", "[expressionLanguageTestGetFilePu
plan->addProcessor("LogAttribute", "LogAttribute", core::Relationship("success", "description"), true);
auto put_file = plan->addProcessor("PutFile", "PutFile", core::Relationship("success", "description"), true);
plan->setProperty(put_file, minifi::processors::PutFile::Directory, (out_dir / "${extracted_attr_name}").string());
plan->setProperty(put_file, minifi::processors::PutFile::ConflictResolution, minifi::processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE);
plan->setProperty(put_file, minifi::processors::PutFile::ConflictResolution, magic_enum::enum_name(minifi::processors::PutFile::FileExistsResolutionStrategy::replace));
plan->setProperty(put_file, minifi::processors::PutFile::CreateDirs, "true");

// Write test input
Expand Down
2 changes: 1 addition & 1 deletion extensions/http-curl/tests/unit/AlertTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class AlertHandler : public ServerAwareHandler {
std::string id(doc["agentId"].GetString(), doc["agentId"].GetStringLength());
REQUIRE(id == agent_id_);
std::vector<std::string> batch;
for (size_t i = 0; i < doc["alerts"].Size(); ++i) {
for (rapidjson::SizeType i = 0; i < doc["alerts"].Size(); ++i) {
REQUIRE(doc["alerts"][i].IsString());
batch.emplace_back(doc["alerts"][i].GetString(), doc["alerts"][i].GetStringLength());
}
Expand Down
2 changes: 1 addition & 1 deletion extensions/python/ExecutePythonProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class ExecutePythonProcessor : public core::Processor {

std::string script_to_exec_;
bool reload_on_script_change_;
std::optional<std::filesystem::file_time_type> last_script_write_time_;
std::optional<std::chrono::file_clock::time_point> last_script_write_time_;
std::string script_file_path_;
std::shared_ptr<core::logging::Logger> python_logger_;
std::unique_ptr<PythonScriptEngine> python_script_engine_;
Expand Down
1 change: 0 additions & 1 deletion extensions/python/tests/ExecutePythonProcessorTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ class ExecutePythonProcessorTestBase {
plan_ = testController_->createPlan();
logTestController_.setDebug<TestPlan>();
logTestController_.setDebug<minifi::processors::PutFile>();
logTestController_.setDebug<minifi::processors::PutFile::ReadCallback>();
}

auto getScriptFullPath(const std::filesystem::path& script_file_name) {
Expand Down
2 changes: 1 addition & 1 deletion extensions/sftp/tests/FetchSFTPTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class FetchSFTPTestsFixture {

// Configure PutFile processor
plan->setProperty(put_file, "Directory", (dst_dir / "${path}").string());
plan->setProperty(put_file, "Conflict Resolution Strategy", minifi::processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL);
plan->setProperty(put_file, "Conflict Resolution Strategy", magic_enum::enum_name(minifi::processors::PutFile::FileExistsResolutionStrategy::fail));
plan->setProperty(put_file, "Create Missing Directories", "true");
}

Expand Down
2 changes: 1 addition & 1 deletion extensions/sftp/tests/ListSFTPTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class ListSFTPTestsFixture {
}

// Create source file
void createFile(const std::filesystem::path& relative_path, const std::string& content, std::optional<std::filesystem::file_time_type> modification_time) {
void createFile(const std::filesystem::path& relative_path, const std::string& content, std::optional<std::chrono::file_clock::time_point> modification_time) {
std::fstream file;
std::filesystem::path full_path = src_dir / "vfs" / relative_path;
std::filesystem::create_directories(full_path.parent_path());
Expand Down
4 changes: 2 additions & 2 deletions extensions/sftp/tests/ListThenFetchSFTPTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class ListThenFetchSFTPTestsFixture {

// Configure PutFile processor
plan->setProperty(put_file, "Directory", (dst_dir / "${path}").string());
plan->setProperty(put_file, "Conflict Resolution Strategy", minifi::processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL);
plan->setProperty(put_file, "Conflict Resolution Strategy", magic_enum::enum_name(minifi::processors::PutFile::FileExistsResolutionStrategy::fail));
plan->setProperty(put_file, "Create Missing Directories", "true");
}

Expand All @@ -148,7 +148,7 @@ class ListThenFetchSFTPTestsFixture {
}

// Create source file
void createFile(const std::string& relative_path, const std::string& content, std::optional<std::filesystem::file_time_type> modification_time) {
void createFile(const std::string& relative_path, const std::string& content, std::optional<std::chrono::file_clock::time_point> modification_time) {
std::fstream file;
std::filesystem::path full_path = src_dir / "vfs" / relative_path;
std::filesystem::create_directories(full_path.parent_path());
Expand Down
2 changes: 1 addition & 1 deletion extensions/sftp/tests/PutSFTPTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class PutSFTPTestsFixture {
REQUIRE(false == file.good());
}

void testModificationTime(const std::string& relative_path, std::filesystem::file_time_type mtime) {
void testModificationTime(const std::string& relative_path, std::chrono::file_clock::time_point mtime) {
auto result_file = dst_dir / "vfs" / relative_path;
REQUIRE(mtime == utils::file::last_write_time(result_file).value());
}
Expand Down
34 changes: 34 additions & 0 deletions extensions/smb/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

if (NOT (WIN32 AND ENABLE_SMB))
return()
endif()

include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)

file(GLOB SOURCES "*.cpp")

add_library(minifi-smb SHARED ${SOURCES})
target_link_libraries(minifi-smb ${LIBMINIFI} Mpr)
target_include_directories(minifi-smb PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")

register_extension(minifi-smb "SMB EXTENSIONS" SMB-EXTENSIONS "This enables SMB support" "extensions/smb/tests")

register_extension_linter(minifi-smb-extensions-linter)
84 changes: 84 additions & 0 deletions extensions/smb/FetchSmb.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "FetchSmb.h"
#include "core/Resource.h"
#include "utils/file/FileReaderCallback.h"

namespace org::apache::nifi::minifi::extensions::smb {

void FetchSmb::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}

void FetchSmb::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
gsl_Expects(context);
if (auto connection_controller_name = context->getProperty(FetchSmb::ConnectionControllerService)) {
smb_connection_controller_service_ = std::dynamic_pointer_cast<SmbConnectionControllerService>(context->getControllerService(*connection_controller_name));
}
if (!smb_connection_controller_service_) {
throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing SMB Connection Controller Service");
}
}

namespace {
std::filesystem::path getPath(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
auto remote_file = context.getProperty(FetchSmb::RemoteFile, flow_file);
if (remote_file && !remote_file->empty()) {
if (remote_file->starts_with('/'))
remote_file->erase(remote_file->begin());
return *remote_file;
}
std::filesystem::path path = flow_file->getAttribute(core::SpecialFlowAttribute::PATH).value_or("");
std::filesystem::path filename = flow_file->getAttribute(core::SpecialFlowAttribute::FILENAME).value_or("");
return path / filename;
}
} // namespace

void FetchSmb::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
gsl_Expects(context && session && smb_connection_controller_service_);

auto connection_error = smb_connection_controller_service_->validateConnection();
if (connection_error) {
logger_->log_error("Couldn't establish connection to the specified network location due to %s", connection_error.message());
context->yield();
return;
}

auto flow_file = session->get();
if (!flow_file) {
context->yield();
return;
}

auto path = getPath(*context, flow_file);

try {
session->write(flow_file, utils::FileReaderCallback{smb_connection_controller_service_->getPath() / path});
session->transfer(flow_file, Success);
} catch (const utils::FileReaderCallbackIOError& io_error) {
flow_file->addAttribute(ErrorCode.name, fmt::format("{}", io_error.error_code));
flow_file->addAttribute(ErrorMessage.name, io_error.what());
session->transfer(flow_file, Failure);
return;
}
}

REGISTER_RESOURCE(FetchSmb, Processor);

} // namespace org::apache::nifi::minifi::extensions::smb
88 changes: 88 additions & 0 deletions extensions/smb/FetchSmb.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <memory>
#include <optional>
#include <regex>
#include <string>
#include <utility>

#include "SmbConnectionControllerService.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include "core/Property.h"
#include "core/PropertyDefinition.h"
#include "core/PropertyDefinitionBuilder.h"
#include "core/OutputAttributeDefinition.h"
#include "core/logging/LoggerConfiguration.h"
#include "utils/Enum.h"
#include "utils/ListingStateManager.h"
#include "utils/file/ListedFile.h"
#include "utils/file/FileUtils.h"

namespace org::apache::nifi::minifi::extensions::smb {

class FetchSmb : public core::Processor {
public:
explicit FetchSmb(std::string name, const utils::Identifier& uuid = {})
: core::Processor(std::move(name), uuid) {
}

EXTENSIONAPI static constexpr const char* Description = "Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.";

EXTENSIONAPI static constexpr auto ConnectionControllerService = core::PropertyDefinitionBuilder<>::createProperty("SMB Connection Controller Service")
.withDescription("Specifies the SMB connection controller service to use for connecting to the SMB server.")
.isRequired(true)
.withAllowedTypes<SmbConnectionControllerService>()
.build();
EXTENSIONAPI static constexpr auto RemoteFile = core::PropertyDefinitionBuilder<>::createProperty("Input Directory")
.withDescription("The full path of the file to be retrieved from the remote server. Expression language supported. If left empty the path and filename attributes will be used.")
.isRequired(false)
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 2>{
ConnectionControllerService,
RemoteFile
};

EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "A flowfile will be routed here for each successfully fetched file."};
EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "A flowfile will be routed here when failed to fetch its content."};
EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure};

EXTENSIONAPI static constexpr auto ErrorCode = core::OutputAttributeDefinition<>{"error.code", { Failure }, "The error code returned by SMB when the fetch of a file fails."};
EXTENSIONAPI static constexpr auto ErrorMessage = core::OutputAttributeDefinition<>{"error.message", { Failure }, "The error message returned by SMB when the fetch of a file fails."};

EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 2>{ FetchSmb::ErrorCode, ErrorMessage };

EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
EXTENSIONAPI static constexpr bool IsSingleThreaded = false;

ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS

void initialize() override;
void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &session_factory) override;
void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;

private:
std::shared_ptr<SmbConnectionControllerService> smb_connection_controller_service_;
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<FetchSmb>::getLogger(uuid_);
};

} // namespace org::apache::nifi::minifi::extensions::smb
Loading

0 comments on commit 8dc4bb2

Please sign in to comment.