Skip to content

complete pipeline definition #1185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Oct 18, 2023
6 changes: 3 additions & 3 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/common/Version.cpp.in ${VERSION_CPP_F
if (UNIX)
if (WITHOUTGDB)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -fpic -fPIC -D_LARGEFILE64_SOURCE")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -fpic -fPIC -D_LARGEFILE64_SOURCE")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17 -Wall -fpic -fPIC -D_LARGEFILE64_SOURCE")
else ()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -g -ggdb -fpic -fPIC -D_LARGEFILE64_SOURCE")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -g -ggdb -fpic -fPIC -D_LARGEFILE64_SOURCE")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++17 -Wall -g -ggdb -fpic -fPIC -D_LARGEFILE64_SOURCE")
endif ()
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0")
Expand Down Expand Up @@ -87,7 +87,7 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/dependencies.cmake)
# Subdirectories (modules).
set(SUB_DIRECTORIES_LIST
aggregator app_config checkpoint common config config_manager config_server_pb
controller event event_handler event_listener file_server go_pipeline helper input log_pb logger models monitor
controller event event_handler event_listener file_server flusher go_pipeline helper input log_pb logger models monitor
parser pipeline plugin plugin/creator plugin/instance plugin/interface polling processor processor/daemon profile_sender reader sdk sender shennong sls_control
fuse
)
Expand Down
2 changes: 1 addition & 1 deletion core/common/LogtailCommonFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ DEFINE_FLAG_STRING(local_machine_uuid, "use this value if not empty, for ut/debu
DEFINE_FLAG_STRING(user_defined_id_file, "", "user_defined_id");
DEFINE_FLAG_STRING(logtail_sys_conf_users_dir, "", "users");
DEFINE_FLAG_INT32(sls_client_send_timeout, "timeout time of one operation for SlsClient", 15);
DEFINE_FLAG_BOOL(sls_client_send_compress, "whether compresses the data or not when put data", true);
// DEFINE_FLAG_BOOL(sls_client_send_compress, "whether compresses the data or not when put data", true);
DEFINE_FLAG_INT32(send_retrytimes, "how many times should retry if PostLogStoreLogs operation fail", 3);
DEFINE_FLAG_INT32(default_StreamLog_tcp_port, "", 11111);
DEFINE_FLAG_INT32(default_StreamLog_poll_size_in_mb, "", 50);
Expand Down
2 changes: 1 addition & 1 deletion core/common/LogtailCommonFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ DECLARE_FLAG_STRING(default_aliuid);
DECLARE_FLAG_STRING(default_access_key_id);
DECLARE_FLAG_STRING(default_access_key);
DECLARE_FLAG_INT32(sls_client_send_timeout);
DECLARE_FLAG_BOOL(sls_client_send_compress);
// DECLARE_FLAG_BOOL(sls_client_send_compress);
DECLARE_FLAG_INT32(send_retrytimes);
DECLARE_FLAG_STRING(user_log_config);
DECLARE_FLAG_STRING(ilogtail_config);
Expand Down
126 changes: 126 additions & 0 deletions core/common/ParamExtractor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#include "common/ParamExtractor.h"

#include "boost/regex.hpp"

using namespace std;

namespace logtail {
string ExtractCurrentKey(const string& key) {
size_t pos = key.rfind('.');
if (pos == string::npos) {
return key;
}
return key.substr(0, pos);
}

bool GetOptionalBoolParam(const Json::Value& config, const string& key, bool& param, string& errorMsg) {
errorMsg.clear();
string curKey = ExtractCurrentKey(key);
const Json::Value* itr = config.find(curKey.c_str(), curKey.c_str() + curKey.length());
if (itr) {
if (!itr->isBool()) {
errorMsg = "param" + key + "is not of type bool";
return false;
}
param = itr->asBool();
}
return true;
}

bool GetOptionalIntParam(const Json::Value& config, const string& key, int32_t& param, string& errorMsg) {
errorMsg.clear();
string curKey = ExtractCurrentKey(key);
const Json::Value* itr = config.find(curKey.c_str(), curKey.c_str() + curKey.length());
if (itr) {
if (!itr->isInt()) {
errorMsg = "param" + key + "is not of type int";
return false;
}
param = itr->asInt();
}
return true;
}

bool GetOptionalUIntParam(const Json::Value& config, const string& key, uint32_t& param, string& errorMsg) {
errorMsg.clear();
string curKey = ExtractCurrentKey(key);
const Json::Value* itr = config.find(curKey.c_str(), curKey.c_str() + curKey.length());
if (itr) {
if (!itr->isUInt()) {
errorMsg = "param" + key + "is not of type uint";
return false;
}
param = itr->asUInt();
}
return true;
}

bool GetOptionalStringParam(const Json::Value& config, const string& key, string& param, string& errorMsg) {
errorMsg.clear();
string curKey = ExtractCurrentKey(key);
const Json::Value* itr = config.find(curKey.c_str(), curKey.c_str() + curKey.length());
if (itr) {
if (!itr->isString()) {
errorMsg = "param" + key + "is not of type string";
return false;
}
param = itr->asString();
}
return true;
}

bool GetMandatoryBoolParam(const Json::Value& config, const string& key, bool& param, string& errorMsg) {
errorMsg.clear();
if (!config.isMember(ExtractCurrentKey(key))) {
errorMsg = "madatory param" + key + "is missing";
return false;
}
return GetOptionalBoolParam(config, key, param, errorMsg);
}

bool GetMandatoryIntParam(const Json::Value& config, const string& key, int32_t& param, string& errorMsg) {
errorMsg.clear();
if (!config.isMember(ExtractCurrentKey(key))) {
errorMsg = "madatory param" + key + "is missing";
return false;
}
return GetOptionalIntParam(config, key, param, errorMsg);
}

bool GetMandatoryUIntParam(const Json::Value& config, const string& key, uint32_t& param, string& errorMsg) {
errorMsg.clear();
if (!config.isMember(ExtractCurrentKey(key))) {
errorMsg = "madatory param" + key + "is missing";
return false;
}
return GetOptionalUIntParam(config, key, param, errorMsg);
}

bool GetMandatoryStringParam(const Json::Value& config, const string& key, string& param, string& errorMsg) {
errorMsg.clear();
if (!config.isMember(ExtractCurrentKey(key))) {
errorMsg = "madatory param" + key + "is missing";
return false;
}
if (!GetOptionalStringParam(config, key, param, errorMsg)) {
return false;
}
if (param.empty()) {
errorMsg = "madatory string param" + key + "is empty";
return false;
}
return true;
}

bool IsRegexValid(const string& regStr) {
if (regStr.empty()) {
return true;
}
try {
boost::regex reg(regStr);
} catch (...) {
return false;
}
return true;
}
} // namespace logtail
163 changes: 163 additions & 0 deletions core/common/ParamExtractor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#pragma once

#include <cstdint>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <vector>

#include "json/json.h"

#include "common/StringTools.h"
#include "logger/Logger.h"

#define PARAM_ERROR(logger, msg, module, config) \
LOG_ERROR(logger, ("failed to init pipeline", msg)("module", module)("config", config)); \
return false
#define PARAM_WARNING_IGNORE(logger, msg, module, config) \
LOG_WARNING(logger, \
("problem encountered during pipeline initialization", \
msg)("action", "ignore param")("module", module)("config", config))
#define PARAM_WARNING_DEFAULT(logger, msg, val, module, config) \
LOG_WARNING(logger, \
("problem encountered during pipeline initialization", msg)("action", "use default value instead")( \
"default value", ToString(val))("module", module)("config", config))

namespace logtail {
std::string ExtractCurrentKey(const std::string& key);

bool GetOptionalBoolParam(const Json::Value& config, const std::string& key, bool& param, std::string& errorMsg);

bool GetOptionalIntParam(const Json::Value& config, const std::string& key, int32_t& param, std::string& errorMsg);

bool GetOptionalUIntParam(const Json::Value& config, const std::string& key, uint32_t& param, std::string& errorMsg);

bool GetOptionalStringParam(const Json::Value& config,
const std::string& key,
std::string& param,
std::string& errorMsg);

template <class T>
bool GetOptionalListParam(const Json::Value& config,
const std::string& key,
std::vector<T>& param,
std::string& errorMsg) {
errorMsg.clear();
const Json::Value* itr = config.find(key.c_str(), key.c_str() + key.length());
if (itr) {
if (!itr->isArray()) {
errorMsg = "param" + key + "is not of type list in plugin ";
return false;
}
for (auto it = itr->begin(); it != itr->end(); ++it) {
if constexpr(std::is_same_v<T, bool>) {
if (!it->isBool()) {
errorMsg = "element in list param" + key + "is not of type bool in plugin ";
return false;
}
param.emplace_back(it->asBool());
} else if constexpr(std::is_same_v<T, uint32_t>) {
if (!it->isUInt()) {
errorMsg = "element in list param" + key + "is not of type uint in plugin ";
return false;
}
param.emplace_back(it->asUInt());
} else if constexpr(std::is_same_v<T, int32_t>) {
if (!it->isInt()) {
errorMsg = "element in list param" + key + "is not of type int in plugin ";
return false;
}
param.emplace_back(it->asInt());
} else if constexpr(std::is_same_v<T, std::string>) {
if (!it->isString()) {
errorMsg = "element in list param" + key + "is not of type string in plugin ";
return false;
}
param.emplace_back(it->asString());
} else {
errorMsg = "element in list param" + key + "is not supported in plugin ";
return false;
}
}
}
return true;
}

template <class T>
bool GetOptionalMapParam(const Json::Value& config,
const std::string& key,
std::unordered_map<std::string, T>& param,
std::string& errorMsg) {
errorMsg.clear();
const Json::Value* itr = config.find(key.c_str(), key.c_str() + key.length());
if (itr) {
if (!itr->isObject()) {
errorMsg = "param" + key + "is not of type map in plugin ";
return false;
}
for (auto it = itr->begin(); it != itr->end(); ++it) {
if constexpr(std::is_same_v<T, bool>) {
if (!it->isBool()) {
errorMsg = "value in map param" + key + "is not of type bool in plugin ";
return false;
}
param[it.name()] = it.deref().asBool();
} else if constexpr(std::is_same_v<T, uint32_t>) {
if (!it->isUInt()) {
errorMsg = "value in map param" + key + "is not of type uint in plugin ";
return false;
}
param[it.name()] = it.deref().asUInt();
} else if constexpr(std::is_same_v<T, int32_t>) {
if (!it->isInt()) {
errorMsg = "value in map param" + key + "is not of type int in plugin ";
return false;
}
param[it.name()] = it.deref().asInt();
} else if constexpr(std::is_same_v<T, std::string>) {
if (!it->isString()) {
errorMsg = "value in map param" + key + "is not of type string in plugin ";
return false;
}
param[it.name()] = it.deref().asString();
} else {
errorMsg = "value in map param" + key + "is not supported in plugin ";
return false;
}
}
}
return true;
}

bool GetMandatoryBoolParam(const Json::Value& config, const std::string& key, bool& param, std::string& errorMsg);

bool GetMandatoryIntParam(const Json::Value& config, const std::string& key, int32_t& param, std::string& errorMsg);

bool GetMandatoryUIntParam(const Json::Value& config, const std::string& key, uint32_t& param, std::string& errorMsg);

bool GetMandatoryStringParam(const Json::Value& config,
const std::string& key,
std::string& param,
std::string& errorMsg);

template <class T>
bool GetMandatoryListParam(const Json::Value& config,
const std::string& key,
std::vector<T>& param,
std::string& errorMsg) {
errorMsg.clear();
if (!config.isMember(key)) {
errorMsg = "madatory param" + key + "is missing in plugin ";
return false;
}
if (!GetOptionalListParam<T>(config, key, param, errorMsg)) {
return false;
}
if (param.empty()) {
errorMsg = "madatory list param" + key + "is empty in plugin ";
return false;
}
}

bool IsRegexValid(const std::string& regStr);
} // namespace logtail
1 change: 1 addition & 0 deletions core/config_manager/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ target_link_libraries(${PROJECT_NAME} monitor)
target_link_libraries(${PROJECT_NAME} checkpoint)
target_link_libraries(${PROJECT_NAME} app_config)
target_link_libraries(${PROJECT_NAME} processor_daemon)
target_link_libraries(${PROJECT_NAME} flusher)
link_curl(${PROJECT_NAME})
1 change: 1 addition & 0 deletions core/config_manager/ConfigManagerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ DECLARE_FLAG_BOOL(enable_collection_mark);
DECLARE_FLAG_BOOL(enable_env_ref_in_config);

DEFINE_FLAG_STRING(ALIYUN_LOG_FILE_TAGS, "default env file key to load tags", "");
DECLARE_FLAG_BOOL(sls_client_send_compress);

namespace logtail {

Expand Down
7 changes: 3 additions & 4 deletions core/config_manager/ConfigManagerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,13 +474,14 @@ class ConfigManagerBase {
void InsertRegionAliuidMap(const std::string& region, const std::string& aliuid);
void ClearRegionAliuidMap();

void InsertRegion(const std::string& region);
void InsertProject(const std::string& project);

private:
// no copy
ConfigManagerBase(const ConfigManagerBase&);
ConfigManagerBase& operator=(const ConfigManagerBase&);

void InsertRegion(const std::string& region);

void ClearRegions();
/** XXX: path is not registered in this method
* @param path is the current dir that being registered
Expand Down Expand Up @@ -537,8 +538,6 @@ class ConfigManagerBase {

void MappingPluginConfig(const Json::Value& configValue, Config* config, Json::Value& pluginJson);

void InsertProject(const std::string& project);

void ClearProjects();

class DoubleBuffer <std::vector<sls_logs::LogTag>>mFileTags;
Expand Down
23 changes: 23 additions & 0 deletions core/flusher/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2022 iLogtail Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

cmake_minimum_required(VERSION 2.9)
project(flusher)

file(GLOB LIB_SOURCE_FILES *.cpp *.h)
append_source_files(LIB_SOURCE_FILES)
add_library(${PROJECT_NAME} STATIC ${LIB_SOURCE_FILES})
target_link_libraries(${PROJECT_NAME} config)
target_link_libraries(${PROJECT_NAME} pipeline)
target_link_libraries(${PROJECT_NAME} sender)
Loading