Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add support for ingesting logs from Kafka #547

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions components/core/src/clp_s/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ set(
JsonParser.cpp
JsonParser.hpp
JsonSerializer.hpp
KafkaReader.cpp
KafkaReader.hpp
ParsedMessage.hpp
ReaderUtils.cpp
ReaderUtils.hpp
Expand Down Expand Up @@ -194,6 +196,7 @@ target_link_libraries(
Boost::filesystem Boost::iostreams Boost::program_options
clp::string_utils
kql
rdkafka
MariaDBClient::MariaDBClient
${MONGOCXX_TARGET}
msgpack-cxx
Expand Down
52 changes: 51 additions & 1 deletion components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
po::options_description compression_options("Compression options");
std::string metadata_db_config_file_path;
std::string input_path_list_file_path;
std::string input_source;
constexpr char cFilesystemSource[] = "filesystem";
constexpr char cKafkaSource[] = "kafka";
std::map<std::string, InputSourceType> input_source_map{
{cFilesystemSource, InputSourceType::Filesystem},
{cKafkaSource, InputSourceType::Kafka}
};
// clang-format off
compression_options.add_options()(
"compression-level",
Expand Down Expand Up @@ -189,6 +196,32 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
"structurize-arrays",
po::bool_switch(&m_structurize_arrays),
"Structurize arrays instead of compressing them as clp strings."
)(
"input-source",
po::value<std::string>(&input_source)
->value_name("INPUT_SOURCE")
->default_value(cFilesystemSource),
"Input source from which to pull data."
)(
"kafka-topic",
po::value<std::string>(&m_kafka_topic),
"Kafka topic from which to pull messages."
)(
"kafka-partition",
po::value<int32_t>(&m_kafka_partition),
"Partition within a Kafka topic from which to pull messages."
)(
"kafka-starting-offset",
po::value<int64_t>(&m_kafka_offset),
"Starting offset within a Kafka partition from which to pull messages."
)(
"kafka-num-messages",
po::value<size_t>(&m_kafka_num_messages),
"Number of messages to consume from Kafka."
)(
"kafka-config-file",
po::value<std::string>(&m_kafka_config_file),
"Path to YAML config file with additional configuration for Kafka consumer."
);
// clang-format on

Expand Down Expand Up @@ -238,10 +271,27 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
}
}

if (m_file_paths.empty()) {
auto it = input_source_map.find(input_source);
if (input_source_map.end() == it) {
throw std::invalid_argument("Unknown input source: " + input_source + ".");
} else {
m_input_source = it->second;
}

if (m_file_paths.empty() && InputSourceType::Kafka != m_input_source) {
throw std::invalid_argument("No input paths specified.");
}

if (InputSourceType::Kafka == m_input_source) {
if (0 == m_kafka_num_messages) {
throw std::invalid_argument("kafka-num-messages must be greater than zero.");
}

if (m_kafka_topic.empty()) {
throw std::invalid_argument("kafka-topic must be specified.");
}
}

// Parse and validate global metadata DB config
if (false == metadata_db_config_file_path.empty()) {
clp::GlobalMetadataDBConfig metadata_db_config;
Expand Down
25 changes: 25 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ class CommandLineArguments {
Stdout,
};

enum class InputSourceType : uint8_t {
Filesystem = 0,
Kafka
};

// Constructors
explicit CommandLineArguments(std::string const& program_name) : m_program_name(program_name) {}

Expand Down Expand Up @@ -108,6 +113,18 @@ class CommandLineArguments {

size_t get_ordered_chunk_size() const { return m_ordered_chunk_size; }

InputSourceType get_input_source() const { return m_input_source; }

std::string const& get_kafka_topic() const { return m_kafka_topic; }

int32_t get_kafka_partition() const { return m_kafka_partition; }

int64_t get_kafka_offset() const { return m_kafka_offset; }

size_t get_num_kafka_messages() const { return m_kafka_num_messages; }

std::string const& get_kafka_config_file() const { return m_kafka_config_file; }

private:
// Methods
/**
Expand Down Expand Up @@ -174,6 +191,14 @@ class CommandLineArguments {
bool m_ordered_decompression{false};
size_t m_ordered_chunk_size{0};

// Input source variables
InputSourceType m_input_source{InputSourceType::Filesystem};
std::string m_kafka_topic;
int32_t m_kafka_partition{};
int64_t m_kafka_offset{};
size_t m_kafka_num_messages{};
std::string m_kafka_config_file;

// Metadata db variables
std::optional<clp::GlobalMetadataDBConfig> m_metadata_db_config;

Expand Down
57 changes: 52 additions & 5 deletions components/core/src/clp_s/JsonParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@

#include "archive_constants.hpp"
#include "JsonFileIterator.hpp"
#include "KafkaReader.hpp"

namespace clp_s {
JsonParser::JsonParser(JsonParserOption const& option)
: m_num_messages(0),
m_target_encoded_size(option.target_encoded_size),
: m_target_encoded_size(option.target_encoded_size),
m_max_document_size(option.max_document_size),
m_timestamp_key(option.timestamp_key),
m_structurize_arrays(option.structurize_arrays) {
m_structurize_arrays(option.structurize_arrays),
m_input_source(option.input_source),
m_kafka_option(std::move(option.kafka_option)) {
if (false == FileUtils::validate_path(option.file_paths)) {
exit(1);
}
Expand Down Expand Up @@ -418,7 +420,54 @@ void JsonParser::parse_line(ondemand::value line, int32_t parent_node_id, std::s
while (!object_stack.empty());
}

bool JsonParser::parse_from_kafka() {
KafkaReader reader{
m_kafka_option.kafka_config_file,
m_kafka_option.topic,
m_kafka_option.partition,
m_kafka_option.offset
};
std::string buffer;
simdjson::ondemand::parser parser;
auto consume_message = [&](char* data, size_t len) -> void {
buffer.clear();
if (buffer.capacity() < len + simdjson::SIMDJSON_PADDING) {
buffer.reserve(len + simdjson::SIMDJSON_PADDING);
}
buffer.insert(0, data, len);
auto record = parser.iterate(buffer);
parse_line(record, -1, "");

auto current_schema_id = m_archive_writer->add_schema(m_current_schema);
m_current_parsed_message.set_id(current_schema_id);
m_archive_writer
->append_message(current_schema_id, m_current_schema, m_current_parsed_message);
m_archive_writer->increment_uncompressed_size(len);

if (m_archive_writer->get_data_size() >= m_target_encoded_size) {
split_archive();
}

m_current_schema.clear();
m_current_parsed_message.clear();
};

return -1 != reader.consume_messages(consume_message, m_kafka_option.num_messages);
}

bool JsonParser::parse() {
if (CommandLineArguments::InputSourceType::Kafka == m_input_source) {
try {
return parse_from_kafka();
} catch (TraceableException& e) {
SPDLOG_ERROR(
"Encountered error - {} - while trying to parse logs from Kafka",
e.what()
);
return false;
}
}

for (auto& file_path : m_file_paths) {
JsonFileIterator json_file_iterator(file_path, m_max_document_size);
if (false == json_file_iterator.is_open()) {
Expand All @@ -438,7 +487,6 @@ bool JsonParser::parse() {

simdjson::ondemand::document_stream::iterator json_it;

m_num_messages = 0;
size_t bytes_consumed_up_to_prev_archive = 0;
size_t bytes_consumed_up_to_prev_record = 0;
while (json_file_iterator.get_json(json_it)) {
Expand Down Expand Up @@ -476,7 +524,6 @@ bool JsonParser::parse() {
m_archive_writer->close();
return false;
}
m_num_messages++;

int32_t current_schema_id = m_archive_writer->add_schema(m_current_schema);
m_current_parsed_message.set_id(current_schema_id);
Expand Down
26 changes: 25 additions & 1 deletion components/core/src/clp_s/JsonParser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "../clp/GlobalMySQLMetadataDB.hpp"
#include "ArchiveWriter.hpp"
#include "CommandLineArguments.hpp"
#include "DictionaryWriter.hpp"
#include "FileReader.hpp"
#include "FileWriter.hpp"
Expand All @@ -26,7 +27,18 @@
using namespace simdjson;

namespace clp_s {
struct KafkaOption {
std::string topic;
int32_t partition{};
int64_t offset{};
size_t num_messages{};
std::string kafka_config_file;
};

struct JsonParserOption {
CommandLineArguments::InputSourceType input_source{
CommandLineArguments::InputSourceType::Filesystem
};
std::vector<std::string> file_paths;
std::string timestamp_key;
std::string archives_dir;
Expand All @@ -36,6 +48,7 @@ struct JsonParserOption {
bool print_archive_stats;
bool structurize_arrays;
std::shared_ptr<clp::GlobalMySQLMetadataDB> metadata_db;
KafkaOption kafka_option;
};

class JsonParser {
Expand Down Expand Up @@ -93,7 +106,13 @@ class JsonParser {
*/
void split_archive();

int m_num_messages;
/**
* Parses input from kafka according to the configuration specified in `m_kafka_option`.
* @throws KafkaReader::OperationFailed
* @return true on success, false otherwise
*/
bool parse_from_kafka();

std::vector<std::string> m_file_paths;

Schema m_current_schema;
Expand All @@ -108,6 +127,11 @@ class JsonParser {
size_t m_target_encoded_size;
size_t m_max_document_size;
bool m_structurize_arrays{false};

CommandLineArguments::InputSourceType m_input_source{
CommandLineArguments::InputSourceType::Filesystem
};
KafkaOption m_kafka_option;
};
} // namespace clp_s

Expand Down
Loading
Loading