From 67bc9f2f7cd3795cf3714e2275501c5747a409b3 Mon Sep 17 00:00:00 2001
From: Abigail Matthews <abigail.v.matthews@gmail.com>
Date: Mon, 25 Nov 2024 09:31:02 -0500
Subject: [PATCH] working deserializer API conversion

---
 components/core/src/clp_s/CMakeLists.txt      |  31 ++
 .../core/src/clp_s/CommandLineArguments.cpp   | 267 +++++++++++++-
 .../core/src/clp_s/CommandLineArguments.hpp   |  14 +-
 components/core/src/clp_s/JsonParser.cpp      | 338 ++++++++++++++++++
 components/core/src/clp_s/JsonParser.hpp      |  64 ++++
 components/core/src/clp_s/clp-s.cpp           | 268 ++++++++++++++
 6 files changed, 980 insertions(+), 2 deletions(-)

diff --git a/components/core/src/clp_s/CMakeLists.txt b/components/core/src/clp_s/CMakeLists.txt
index 1656a5d59..9ca0c947e 100644
--- a/components/core/src/clp_s/CMakeLists.txt
+++ b/components/core/src/clp_s/CMakeLists.txt
@@ -8,11 +8,35 @@ set(
         ../clp/database_utils.hpp
         ../clp/Defs.h
         ../clp/ErrorCode.hpp
+        ../clp/ffi/ir_stream/decoding_methods.cpp
+        ../clp/ffi/ir_stream/decoding_methods.hpp
+        ../clp/ffi/ir_stream/Deserializer.hpp
+        ../clp/ffi/ir_stream/encoding_methods.cpp
+        ../clp/ffi/ir_stream/encoding_methods.hpp
+        ../clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp
+        ../clp/ffi/ir_stream/ir_unit_deserialization_methods.hpp
+        ../clp/ffi/ir_stream/Serializer.cpp
+        ../clp/ffi/ir_stream/Serializer.hpp
+        ../clp/ffi/ir_stream/utils.cpp
+        ../clp/ffi/ir_stream/utils.hpp
+        ../clp/ffi/KeyValuePairLogEvent.cpp
+        ../clp/ffi/KeyValuePairLogEvent.hpp
+        ../clp/ffi/SchemaTree.cpp
+        ../clp/ffi/SchemaTree.hpp
+        ../clp/ffi/utils.cpp
+        ../clp/ffi/utils.hpp
+        ../clp/ffi/Value.hpp
+        ../clp/FileDescriptor.cpp
+        ../clp/FileDescriptor.hpp
         ../clp/GlobalMetadataDB.hpp
         ../clp/GlobalMetadataDBConfig.cpp
         ../clp/GlobalMetadataDBConfig.hpp
         ../clp/GlobalMySQLMetadataDB.cpp
         ../clp/GlobalMySQLMetadataDB.hpp
+        ../clp/ir/EncodedTextAst.cpp
+        ../clp/ir/EncodedTextAst.hpp
+        ../clp/ir/parsing.cpp
+        ../clp/ir/parsing.hpp
         ../clp/MySQLDB.cpp
         ../clp/MySQLDB.hpp
         ../clp/MySQLParamBindings.cpp
@@ -23,9 +47,16 @@ set(
         ../clp/networking/socket_utils.hpp
         ../clp/ReaderInterface.cpp
         ../clp/ReaderInterface.hpp
+        ../clp/ReadOnlyMemoryMappedFile.cpp
+        ../clp/ReadOnlyMemoryMappedFile.hpp
         ../clp/streaming_archive/ArchiveMetadata.cpp
         ../clp/streaming_archive/ArchiveMetadata.hpp
+        ../clp/streaming_compression/zstd/Decompressor.cpp
+        ../clp/streaming_compression/zstd/Decompressor.hpp
         ../clp/TraceableException.hpp
+        ../clp/time_types.hpp
+        ../clp/utf8_utils.cpp
+        ../clp/utf8_utils.hpp
         ../clp/WriterInterface.cpp
         ../clp/WriterInterface.hpp
 )
diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp
index cf69a066c..ef9d62cd5 100644
--- a/components/core/src/clp_s/CommandLineArguments.cpp
+++ b/components/core/src/clp_s/CommandLineArguments.cpp
@@ -106,11 +106,15 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
                 std::cerr << "  c - compress" << std::endl;
                 std::cerr << "  x - decompress" << std::endl;
                 std::cerr << "  s - search" << std::endl;
+                std::cerr << "  r - JSON to IR Format" << std::endl;
+                std::cerr << "  i - compress IR format" << std::endl;
                 std::cerr << std::endl;
                 std::cerr << "Try "
                           << " c --help OR"
                           << " x --help OR"
-                          << " s --help for command-specific details." << std::endl;
+                          << " s --help OR"
+                          << " r --help OR"
+                          << " i --help for command-specific details." << std::endl;
 
                 po::options_description visible_options;
                 visible_options.add(general_options);
@@ -125,6 +129,8 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
             case (char)Command::Compress:
             case (char)Command::Extract:
             case (char)Command::Search:
+            case (char)Command::JsonToIr:
+            case (char)Command::IrCompress:
                 m_command = (Command)command_input;
                 break;
             default:
@@ -686,6 +692,257 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
                         "The --count-by-time and --count options are mutually exclusive."
                 );
             }
+        } else if (Command::IrCompress == m_command) {
+            po::options_description compression_positional_options;
+            // clang-format off
+             compression_positional_options.add_options()(
+                     "archives-dir",
+                     po::value<std::string>(&m_archives_dir)->value_name("DIR"),
+                     "output directory"
+             )(
+                     "input-paths",
+                     po::value<std::vector<std::string>>(&m_file_paths)->value_name("PATHS"),
+                     "input paths"
+             );
+            // clang-format on
+
+            po::options_description compression_options("Compression options");
+            std::string metadata_db_config_file_path;
+            std::string input_path_list_file_path;
+            // clang-format off
+            compression_options.add_options()(
+                    "compression-level",
+                    po::value<int>(&m_compression_level)->value_name("LEVEL")->
+                        default_value(m_compression_level),
+                    "1 (fast/low compression) to 9 (slow/high compression)."
+            )(
+                    "target-encoded-size",
+                    po::value<size_t>(&m_target_encoded_size)->value_name("TARGET_ENCODED_SIZE")->
+                        default_value(m_target_encoded_size),
+                    "Target size (B) for the dictionaries and encoded messages before a new "
+                    "archive is created."
+            )(
+                    "max-document-size",
+                    po::value<size_t>(&m_max_document_size)->value_name("DOC_SIZE")->
+                        default_value(m_max_document_size),
+                    "Maximum allowed size (B) for a single document before compression fails."
+            )(
+                    "timestamp-key",
+                    po::value<std::string>(&m_timestamp_key)->value_name("TIMESTAMP_COLUMN_KEY")->
+                        default_value(m_timestamp_key),
+                    "Path (e.g. x.y) for the field containing the log event's timestamp."
+            )(
+                    "db-config-file",
+                    po::value<std::string>(&metadata_db_config_file_path)->value_name("FILE")->
+                    default_value(metadata_db_config_file_path),
+                    "Global metadata DB YAML config"
+            )(
+                    "files-from,f",
+                    po::value<std::string>(&input_path_list_file_path)
+                            ->value_name("FILE")
+                            ->default_value(input_path_list_file_path),
+                    "Compress files specified in FILE"
+            )(
+                    "print-archive-stats",
+                    po::bool_switch(&m_print_archive_stats),
+                    "Print statistics (json) about the archive after it's compressed."
+            );
+            // clang-format on
+
+            po::positional_options_description positional_options;
+            positional_options.add("archives-dir", 1);
+            positional_options.add("input-paths", -1);
+
+            po::options_description all_compression_options;
+            all_compression_options.add(compression_options);
+            all_compression_options.add(compression_positional_options);
+
+            std::vector<std::string> unrecognized_options
+                    = po::collect_unrecognized(parsed.options, po::include_positional);
+            unrecognized_options.erase(unrecognized_options.begin());
+            po::store(
+                    po::command_line_parser(unrecognized_options)
+                            .options(all_compression_options)
+                            .positional(positional_options)
+                            .run(),
+                    parsed_command_line_options
+            );
+            po::notify(parsed_command_line_options);
+
+            if (parsed_command_line_options.count("help")) {
+                print_ir_compression_usage();
+
+                std::cerr << "Examples:\n";
+                std::cerr << "  # Compress file1.ir and dir1 into archives-dir\n";
+                std::cerr << "  " << m_program_name << " i archives-dir file1.ir dir1\n";
+
+                po::options_description visible_options;
+                visible_options.add(general_options);
+                visible_options.add(compression_options);
+                std::cerr << visible_options << '\n';
+                return ParsingResult::InfoCommand;
+            }
+
+            if (m_archives_dir.empty()) {
+                throw std::invalid_argument("No archives directory specified.");
+            }
+
+            if (false == input_path_list_file_path.empty()) {
+                if (false == read_paths_from_file(input_path_list_file_path, m_file_paths)) {
+                    SPDLOG_ERROR("Failed to read paths from {}", input_path_list_file_path);
+                    return ParsingResult::Failure;
+                }
+            }
+
+            if (m_file_paths.empty()) {
+                throw std::invalid_argument("No input paths specified.");
+            }
+
+            // Parse and validate global metadata DB config
+            if (false == metadata_db_config_file_path.empty()) {
+                clp::GlobalMetadataDBConfig metadata_db_config;
+                try {
+                    metadata_db_config.parse_config_file(metadata_db_config_file_path);
+                } catch (std::exception& e) {
+                    SPDLOG_ERROR("Failed to validate metadata database config - {}.", e.what());
+                    return ParsingResult::Failure;
+                }
+
+                if (clp::GlobalMetadataDBConfig::MetadataDBType::MySQL
+                    != metadata_db_config.get_metadata_db_type())
+                {
+                    SPDLOG_ERROR(
+                            "Invalid metadata database type for {}; only supported type is MySQL.",
+                            m_program_name
+                    );
+                    return ParsingResult::Failure;
+                }
+
+                m_metadata_db_config = std::move(metadata_db_config);
+            }
+        } else if ((char)Command::JsonToIr == command_input) {
+            po::options_description compression_positional_options;
+            // clang-format off
+             compression_positional_options.add_options()(
+                     "ir-dir",
+                     po::value<std::string>(&m_archives_dir)->value_name("DIR"),
+                     "output directory"
+             )(
+                     "input-paths",
+                     po::value<std::vector<std::string>>(&m_file_paths)->value_name("PATHS"),
+                     "input paths"
+             );
+            // clang-format on
+
+            po::options_description compression_options("Compression options");
+            std::string metadata_db_config_file_path;
+            std::string input_path_list_file_path;
+            // clang-format off
+            compression_options.add_options()(
+                    "compression-level",
+                    po::value<int>(&m_compression_level)->value_name("LEVEL")->
+                        default_value(m_compression_level),
+                    "1 (fast/low compression) to 9 (slow/high compression)."
+            )(
+                    "max-document-size",
+                    po::value<size_t>(&m_max_document_size)->value_name("DOC_SIZE")->
+                        default_value(m_max_document_size),
+                    "Maximum allowed size (B) for a single document before ir generation fails."
+            )(
+                    "max-ir-buffer-size",
+                    po::value<size_t>(&m_max_ir_buffer_size)->value_name("BUFFER_SIZE")->
+                        default_value(m_max_ir_buffer_size),
+                    "Maximum allowed size (B) for an in memory IR buffer befroe being written to file."
+            )(
+                    "encoding-type",
+                    po::value<int>(&m_encoding_type)->value_name("ENCODING_TYPE")->
+                        default_value(m_encoding_type),
+                    "4 (four byte encoding) or 8 (eight byte encoding)"
+            )(
+                    "db-config-file",
+                    po::value<std::string>(&metadata_db_config_file_path)->value_name("FILE")->
+                    default_value(metadata_db_config_file_path),
+                    "Global metadata DB YAML config"
+            )(
+                    "files-from,f",
+                    po::value<std::string>(&input_path_list_file_path)
+                            ->value_name("FILE")
+                            ->default_value(input_path_list_file_path),
+                    "Compress files specified in FILE"
+            );
+            // clang-format on
+
+            po::positional_options_description positional_options;
+            positional_options.add("ir-dir", 1);
+            positional_options.add("input-paths", -1);
+
+            po::options_description all_compression_options;
+            all_compression_options.add(compression_options);
+            all_compression_options.add(compression_positional_options);
+
+            std::vector<std::string> unrecognized_options
+                    = po::collect_unrecognized(parsed.options, po::include_positional);
+            unrecognized_options.erase(unrecognized_options.begin());
+            po::store(
+                    po::command_line_parser(unrecognized_options)
+                            .options(all_compression_options)
+                            .positional(positional_options)
+                            .run(),
+                    parsed_command_line_options
+            );
+            po::notify(parsed_command_line_options);
+
+            if (parsed_command_line_options.count("help")) {
+                print_json_to_ir_usage();
+
+                std::cerr << "Examples:\n";
+                std::cerr << "  # Parse file1.json and dir1 into irs-dir\n";
+                std::cerr << "  " << m_program_name << " r irs-dir file1.json dir1\n";
+
+                po::options_description visible_options;
+                visible_options.add(general_options);
+                visible_options.add(compression_options);
+                std::cerr << visible_options << '\n';
+                return ParsingResult::InfoCommand;
+            }
+
+            if (m_archives_dir.empty()) {
+                throw std::invalid_argument("No IRs directory specified.");
+            }
+
+            if (false == input_path_list_file_path.empty()) {
+                if (false == read_paths_from_file(input_path_list_file_path, m_file_paths)) {
+                    SPDLOG_ERROR("Failed to read paths from {}", input_path_list_file_path);
+                    return ParsingResult::Failure;
+                }
+            }
+
+            if (m_file_paths.empty()) {
+                throw std::invalid_argument("No input paths specified.");
+            }
+
+            // Parse and validate global metadata DB config
+            if (false == metadata_db_config_file_path.empty()) {
+                clp::GlobalMetadataDBConfig metadata_db_config;
+                try {
+                    metadata_db_config.parse_config_file(metadata_db_config_file_path);
+                } catch (std::exception& e) {
+                    SPDLOG_ERROR("Failed to validate metadata database config - {}.", e.what());
+                    return ParsingResult::Failure;
+                }
+
+                if (clp::GlobalMetadataDBConfig::MetadataDBType::MySQL
+                    != metadata_db_config.get_metadata_db_type())
+                {
+                    SPDLOG_ERROR(
+                            "Invalid metadata database type for {}; only supported type is MySQL.",
+                            m_program_name
+                    );
+                    return ParsingResult::Failure;
+                }
+
+                m_metadata_db_config = std::move(metadata_db_config);
+            }
         }
     } catch (std::exception& e) {
         SPDLOG_ERROR("{}", e.what());
@@ -799,4 +1056,12 @@ void CommandLineArguments::print_search_usage() const {
                  " [OUTPUT_HANDLER [OUTPUT_HANDLER_OPTIONS]]"
               << std::endl;
 }
+
+void CommandLineArguments::print_json_to_ir_usage() const {
+    std::cerr << "Usage: " << m_program_name << " r [OPTIONS] IRS_DIR [FILE/DIR ...]\n";
+}
+
+void CommandLineArguments::print_ir_compression_usage() const {
+    std::cerr << "Usage: " << m_program_name << " i [OPTIONS] ARCHIVES_DIR [FILE/DIR ...]\n";
+}
 }  // namespace clp_s
diff --git a/components/core/src/clp_s/CommandLineArguments.hpp b/components/core/src/clp_s/CommandLineArguments.hpp
index 798e42728..ae4d98ddc 100644
--- a/components/core/src/clp_s/CommandLineArguments.hpp
+++ b/components/core/src/clp_s/CommandLineArguments.hpp
@@ -26,7 +26,9 @@ class CommandLineArguments {
     enum class Command : char {
         Compress = 'c',
         Extract = 'x',
-        Search = 's'
+        Search = 's',
+        JsonToIr = 'r',
+        IrCompress = 'i'
     };
 
     enum class OutputHandlerType : uint8_t {
@@ -60,6 +62,10 @@ class CommandLineArguments {
 
     size_t get_max_document_size() const { return m_max_document_size; }
 
+    [[nodiscard]] auto get_max_ir_buffer_size() const -> size_t { return m_max_ir_buffer_size; }
+
+    [[nodiscard]] auto get_encoding_type() const -> int { return m_encoding_type; }
+
     [[nodiscard]] bool print_archive_stats() const { return m_print_archive_stats; }
 
     std::string const& get_mongodb_uri() const { return m_mongodb_uri; }
@@ -159,6 +165,10 @@ class CommandLineArguments {
 
     void print_decompression_usage() const;
 
+    void print_ir_compression_usage() const;
+
+    void print_json_to_ir_usage() const;
+
     void print_search_usage() const;
 
     // Variables
@@ -178,6 +188,8 @@ class CommandLineArguments {
     bool m_ordered_decompression{false};
     size_t m_ordered_chunk_size{0};
     size_t m_minimum_table_size{1ULL * 1024 * 1024};  // 1 MB
+    int m_encoding_type{8};
+    size_t m_max_ir_buffer_size{512ULL * 1024 * 1024};
 
     // Metadata db variables
     std::optional<clp::GlobalMetadataDBConfig> m_metadata_db_config;
diff --git a/components/core/src/clp_s/JsonParser.cpp b/components/core/src/clp_s/JsonParser.cpp
index 7d4af1469..e059398eb 100644
--- a/components/core/src/clp_s/JsonParser.cpp
+++ b/components/core/src/clp_s/JsonParser.cpp
@@ -2,14 +2,82 @@
 
 #include <iostream>
 #include <stack>
+#include <unordered_map>
 
 #include <simdjson.h>
 #include <spdlog/spdlog.h>
 
+#include "../clp/ffi/ir_stream/decoding_methods.hpp"
+#include "../clp/ffi/ir_stream/Deserializer.hpp"
+#include "../clp/ffi/KeyValuePairLogEvent.hpp"
+#include "../clp/ffi/SchemaTree.hpp"
+#include "../clp/ffi/utils.hpp"
+#include "../clp/ffi/Value.hpp"
+#include "../clp/streaming_compression/zstd/Decompressor.hpp"
+#include "../clp/time_types.hpp"
 #include "archive_constants.hpp"
+#include "ErrorCode.hpp"
 #include "JsonFileIterator.hpp"
 
+using clp::ffi::ir_stream::Deserializer;
+using clp::ffi::ir_stream::IRErrorCode;
+using clp::ffi::KeyValuePairLogEvent;
+using clp::UtcOffset;
+
 namespace clp_s {
+
+/**
+ * Class that implements `clp::ffi::ir_stream::IrUnitHandlerInterface` for testing purposes.
+ */
+class IrUnitHandler {
+public:
+    [[nodiscard]] auto handle_log_event(KeyValuePairLogEvent&& log_event) -> IRErrorCode {
+        m_deserialized_log_event.emplace(std::move(log_event));
+        return IRErrorCode::IRErrorCode_Success;
+    }
+
+    [[nodiscard]] static auto handle_utc_offset_change(
+            [[maybe_unused]] UtcOffset utc_offset_old,
+            [[maybe_unused]] UtcOffset utc_offset_new
+    ) -> IRErrorCode {
+        return IRErrorCode::IRErrorCode_Success;
+    }
+
+    [[nodiscard]] auto handle_schema_tree_node_insertion(
+            [[maybe_unused]] clp::ffi::SchemaTree::NodeLocator schema_tree_node_locator
+    ) -> IRErrorCode {
+        // new_nodes.push_back(schema_tree_node_locator);
+        return IRErrorCode::IRErrorCode_Success;
+    }
+
+    [[nodiscard]] auto handle_end_of_stream() -> IRErrorCode {
+        m_is_complete = true;
+        return IRErrorCode::IRErrorCode_Success;
+    }
+
+    /*[[nodiscard]] auto get_new_schema_nodes() const ->
+    std::vector<clp::ffi::SchemaTree::NodeLocator> const& { return new_nodes;
+    }*/
+
+    [[nodiscard]] auto get_deserialized_log_event(
+    ) const -> std::optional<KeyValuePairLogEvent> const& {
+        return m_deserialized_log_event;
+    }
+
+    void clear() {
+        // new_nodes.clear();
+        // m_deserialized_log_event.clear();
+        m_is_complete = false;
+    }
+
+    [[nodiscard]] auto is_complete() const -> bool { return m_is_complete; }
+
+private:
+    // std::vector<clp::ffi::SchemaTree::NodeLocator> new_nodes;
+    std::optional<KeyValuePairLogEvent> m_deserialized_log_event;
+    bool m_is_complete{false};
+};
+
 JsonParser::JsonParser(JsonParserOption const& option)
         : m_num_messages(0),
           m_target_encoded_size(option.target_encoded_size),
@@ -521,6 +589,276 @@ bool JsonParser::parse() {
     return true;
 }
 
+auto JsonParser::get_archive_node_type(
+        clp::ffi::SchemaTree::Node::Type ir_node_type,
+        bool node_has_value,
+        std::optional<clp::ffi::Value> const& node_value
+) -> NodeType {
+    // figure out what type the node is in archive node type
+    NodeType archive_node_type = NodeType::Unknown;
+    switch (ir_node_type) {
+        case clp::ffi::SchemaTree::Node::Type::Int:
+            archive_node_type = NodeType::Integer;
+            break;
+        case clp::ffi::SchemaTree::Node::Type::Float:
+            archive_node_type = NodeType::Float;
+            break;
+        case clp::ffi::SchemaTree::Node::Type::Bool:
+            archive_node_type = NodeType::Boolean;
+            break;
+        case clp::ffi::SchemaTree::Node::Type::UnstructuredArray:
+            archive_node_type = NodeType::UnstructuredArray;
+            break;
+        case clp::ffi::SchemaTree::Node::Type::Str:
+            if (node_value && node_value->is<std::string>()) {
+                archive_node_type = NodeType::VarString;
+            } else {
+                archive_node_type = NodeType::ClpString;
+            }
+            break;
+        case clp::ffi::SchemaTree::Node::Type::Obj:
+            if (node_has_value) {
+                if (node_value->is_null()) {
+                    archive_node_type = NodeType::NullValue;
+                } else {
+                    archive_node_type = NodeType::Object;
+                }
+            } else {
+                archive_node_type = NodeType::Object;
+            }
+            break;
+        default:
+            break;
+    }
+    return archive_node_type;
+}
+
+auto JsonParser::get_archive_node_id(
+        std::unordered_map<int32_t, std::vector<std::pair<NodeType, int32_t>>>&
+                ir_node_to_archive_node_unordered_map,
+        uint32_t ir_node_id,
+        NodeType archive_node_type,
+        clp::ffi::SchemaTree const& ir_tree
+) -> int {
+    auto unordered_map_location = ir_node_to_archive_node_unordered_map.find(ir_node_id);
+    if (ir_node_to_archive_node_unordered_map.end() != unordered_map_location) {
+        auto translation_vector = unordered_map_location->second;
+        for (int i = 0; i < translation_vector.size(); i++) {
+            if (translation_vector[i].first == archive_node_type) {
+                return translation_vector[i].second;
+            }
+        }
+    }
+
+    auto const& curr_node = ir_tree.get_node(ir_node_id);
+    int32_t parent_node_id{-1};
+    // Modify this check to look for null ... or the ir_streams version of null
+    auto parent_of_curr_node_id = curr_node.get_parent_id();
+    // if (ir_node_id != parent_of_curr_node_id) {
+    if (parent_of_curr_node_id.has_value()) {
+        parent_node_id = get_archive_node_id(
+                ir_node_to_archive_node_unordered_map,
+                parent_of_curr_node_id.value(),
+                NodeType::Object,
+                ir_tree
+        );
+    }
+    auto validated_escaped_key
+            = clp::ffi::validate_and_escape_utf8_string(curr_node.get_key_name());
+    std::string node_key = "";
+    if (validated_escaped_key.has_value()) {
+        node_key = validated_escaped_key.value();
+    } else {
+        throw "Key is not UTF-8 compliant";
+    }
+    int curr_node_archive_id
+            = m_archive_writer->add_node(parent_node_id, archive_node_type, node_key);
+    auto p = std::make_pair(archive_node_type, curr_node_archive_id);
+    if (ir_node_to_archive_node_unordered_map.end() != unordered_map_location) {
+        unordered_map_location->second.push_back(p);
+    } else {
+        std::vector<std::pair<NodeType, int32_t>> v;
+        v.push_back(p);
+        ir_node_to_archive_node_unordered_map.emplace(ir_node_id, v);
+    }
+    return curr_node_archive_id;
+}
+
+void JsonParser::parse_kv_log_event(
+        KeyValuePairLogEvent const& kv,
+        std::unordered_map<int32_t, std::vector<std::pair<NodeType, int32_t>>>&
+                ir_node_to_archive_node_unordered_map
+) {
+    clp::ffi::SchemaTree const& tree = kv.get_schema_tree();
+    for (auto const& pair : kv.get_node_id_value_pairs()) {
+        clp::ffi::SchemaTree::Node const& tree_node = tree.get_node(pair.first);
+        clp::ffi::SchemaTree::Node::Type ir_node_type = tree_node.get_type();
+        bool node_has_value = pair.second.has_value();
+        NodeType archive_node_type = NodeType::Unknown;
+        if (node_has_value) {
+            archive_node_type
+                    = get_archive_node_type(ir_node_type, node_has_value, pair.second.value());
+        } else {
+            archive_node_type = get_archive_node_type(ir_node_type, node_has_value, {});
+        }
+        int node_id;
+        try {
+            node_id = get_archive_node_id(
+                    ir_node_to_archive_node_unordered_map,
+                    pair.first,
+                    archive_node_type,
+                    tree
+            );
+        } catch (...) {
+            throw;
+        }
+
+        switch (archive_node_type) {
+            case NodeType::Integer: {
+                int64_t i64_value = pair.second.value().get_immutable_view<clp::ffi::value_int_t>();
+                m_current_parsed_message.add_value(node_id, i64_value);
+            } break;
+            case NodeType::Float: {
+                double d_value = pair.second.value().get_immutable_view<clp::ffi::value_float_t>();
+                m_current_parsed_message.add_value(node_id, d_value);
+            } break;
+            case NodeType::Boolean: {
+                bool b_value = pair.second.value().get_immutable_view<clp::ffi::value_bool_t>();
+                m_current_parsed_message.add_value(node_id, b_value);
+            } break;
+            case NodeType::VarString: {
+                auto validated_escaped_string = clp::ffi::validate_and_escape_utf8_string(
+                        pair.second.value().get_immutable_view<std::string>()
+                );
+                std::string str = "";
+                if (validated_escaped_string.has_value()) {
+                    str = validated_escaped_string.value();
+                } else {
+                    throw "String is not utf8 compliant";
+                }
+                m_current_parsed_message.add_value(node_id, str);
+            } break;
+            case NodeType::ClpString: {
+                std::string encoded_str = "";
+                std::string decodedValue = "";
+                if (pair.second.value().is<clp::ir::EightByteEncodedTextAst>()) {
+                    decodedValue = pair.second.value()
+                                           .get_immutable_view<clp::ir::EightByteEncodedTextAst>()
+                                           .decode_and_unparse()
+                                           .value();
+
+                } else {
+                    decodedValue = pair.second.value()
+                                           .get_immutable_view<clp::ir::FourByteEncodedTextAst>()
+                                           .decode_and_unparse()
+                                           .value();
+                }
+                auto validated_escaped_encoded_string
+                        = clp::ffi::validate_and_escape_utf8_string(decodedValue.c_str());
+                if (validated_escaped_encoded_string.has_value()) {
+                    encoded_str = validated_escaped_encoded_string.value();
+                } else {
+                    throw "Encoded string is not utf8 compliant";
+                }
+                m_current_parsed_message.add_value(node_id, encoded_str);
+            } break;
+            case NodeType::UnstructuredArray: {
+                std::string array_str;
+                if (pair.second.value().is<clp::ir::EightByteEncodedTextAst>()) {
+                    array_str = pair.second.value()
+                                        .get_immutable_view<clp::ir::EightByteEncodedTextAst>()
+                                        .decode_and_unparse()
+                                        .value();
+                } else {
+                    array_str = pair.second.value()
+                                        .get_immutable_view<clp::ir::FourByteEncodedTextAst>()
+                                        .decode_and_unparse()
+                                        .value();
+                }
+                m_current_parsed_message.add_value(node_id, array_str);
+                break;
+            }
+            default:
+                // Don't need to add value for obj or null
+                break;
+        }
+        m_current_schema.insert_ordered(node_id);
+    }
+
+    int32_t current_schema_id = m_archive_writer->add_schema(m_current_schema);
+    m_current_parsed_message.set_id(current_schema_id);
+    m_archive_writer->append_message(current_schema_id, m_current_schema, m_current_parsed_message);
+}
+
+auto JsonParser::parse_from_ir() -> bool {
+    std::unordered_map<int32_t, std::vector<std::pair<NodeType, int32_t>>>
+            ir_node_to_archive_node_unordered_map;
+
+    for (auto& file_path : m_file_paths) {
+        int fsize = std::filesystem::file_size(file_path);
+        if (0 == fsize) {
+            m_archive_writer->close();
+            return false;
+        }
+        clp::streaming_compression::zstd::Decompressor zd;
+        zd.open(file_path);
+
+        auto deserializer_result{Deserializer<IrUnitHandler>::create(zd, IrUnitHandler{})};
+        if (deserializer_result.has_error()) {
+            zd.close();
+            m_archive_writer->close();
+            return false;
+        }
+        auto& deserializer = deserializer_result.value();
+        auto& ir_unit_handler{deserializer.get_ir_unit_handler()};
+        do {
+            auto const kv_log_event_result{deserializer.deserialize_next_ir_unit(zd)};
+
+            if (kv_log_event_result.has_error()) {
+                break;
+            }
+            if (kv_log_event_result.value() == clp::ffi::ir_stream::IrUnitType::EndOfStream) {
+                break;
+            } else if (kv_log_event_result.value() == clp::ffi::ir_stream::IrUnitType::LogEvent) {
+                // auto new_nodes = ir_unit_handler.get_new_schema_nodes();
+                auto kv_log_event = &(ir_unit_handler.get_deserialized_log_event().value());
+
+                m_current_schema.clear();
+
+                try {
+                    parse_kv_log_event(
+                            *kv_log_event,
+                            ir_node_to_archive_node_unordered_map
+                    );  //, new_nodes);
+                } catch (std::string msg) {
+                    SPDLOG_ERROR("ERROR: {}" + msg);
+                    zd.close();
+                    return false;
+                } catch (...) {
+                    SPDLOG_ERROR("ERROR: Encountered error while parsing a kv log event");
+                    zd.close();
+                    return false;
+                }
+
+                if (m_archive_writer->get_data_size() >= m_target_encoded_size) {
+                    ir_node_to_archive_node_unordered_map.clear();
+                    split_archive();
+                }
+
+                ir_unit_handler.clear();
+                m_current_parsed_message.clear();
+
+            } else {
+                continue;
+            }
+
+        } while (true);
+        ir_node_to_archive_node_unordered_map.clear();
+        zd.close();
+    }
+    return true;
+}
+
 void JsonParser::store() {
     m_archive_writer->close();
 }
diff --git a/components/core/src/clp_s/JsonParser.hpp b/components/core/src/clp_s/JsonParser.hpp
index af6b024ef..f69428825 100644
--- a/components/core/src/clp_s/JsonParser.hpp
+++ b/components/core/src/clp_s/JsonParser.hpp
@@ -3,12 +3,16 @@
 
 #include <map>
 #include <string>
+#include <unordered_map>
 #include <variant>
 #include <vector>
 
 #include <boost/uuid/random_generator.hpp>
 #include <simdjson.h>
 
+#include "../clp/ffi/KeyValuePairLogEvent.hpp"
+#include "../clp/ffi/SchemaTree.hpp"
+#include "../clp/ffi/Value.hpp"
 #include "../clp/GlobalMySQLMetadataDB.hpp"
 #include "ArchiveWriter.hpp"
 #include "DictionaryWriter.hpp"
@@ -25,6 +29,8 @@
 
 using namespace simdjson;
 
+using clp::ffi::KeyValuePairLogEvent;
+
 namespace clp_s {
 struct JsonParserOption {
     std::vector<std::string> file_paths;
@@ -39,6 +45,15 @@ struct JsonParserOption {
     std::shared_ptr<clp::GlobalMySQLMetadataDB> metadata_db;
 };
 
+struct JsonToIrParserOption {
+    std::vector<std::string> file_paths;
+    std::string irs_dir;
+    size_t max_document_size;
+    size_t max_ir_buffer_size;
+    int compression_level;
+    int encoding;
+};
+
 class JsonParser {
 public:
     class OperationFailed : public TraceableException {
@@ -51,6 +66,8 @@ class JsonParser {
     // Constructor
     explicit JsonParser(JsonParserOption const& option);
 
+    JsonParser(JsonToIrParserOption const& option);
+
     // Destructor
     ~JsonParser() = default;
 
@@ -60,6 +77,12 @@ class JsonParser {
      */
     [[nodiscard]] bool parse();
 
+    /**
+     * Parses the Key Value IR Stream and stores the data in the archive.
+     * @return whether the IR Stream was parsed successfully
+     */
+    [[nodiscard]] auto parse_from_ir() -> bool;
+
     /**
      * Writes the metadata and archive data to disk.
      */
@@ -75,6 +98,47 @@ class JsonParser {
      */
     void parse_line(ondemand::value line, int32_t parent_node_id, std::string const& key);
 
+    /**
+     * Determines the archive node type based on the IR node type and value.
+     * @param ir_node_type schema node type from the IR stream
+     * @param node_has_value Boolean that says whether or not the node has value.
+     * @param node_value The IR schema node value if the node has value
+     * @return The clp-s archive Node Type that should be used for the archive node
+     */
+    static auto get_archive_node_type(
+            clp::ffi::SchemaTree::Node::Type ir_node_type,
+            bool node_has_value,
+            std::optional<clp::ffi::Value> const& node_value
+    ) -> NodeType;
+
+    /**
+     * Get archive node id for ir node
+     * @param ir_node_to_archive_node_unordered_map cache of node id conversions between
+     * deserializer schema tree nodes and archive schema tree nodes
+     * @param ir_node_id ID of the IR node
+     * @param archive_node_type Type of the archive node
+     * @param ir_tree The IR schema tree
+     */
+    auto get_archive_node_id(
+            std::unordered_map<int32_t, std::vector<std::pair<NodeType, int32_t>>>&
+                    ir_node_to_archive_node_unordered_map,
+            uint32_t ir_node_id,
+            NodeType archive_node_type,
+            clp::ffi::SchemaTree const& ir_tree
+    ) -> int;
+
+    /**
+     * Parses a Key Value Log Event
+     * @param kv the key value log event
+     * @param ir_node_to_archive_node_unordered_map cache of node id conversions between
+     * deserializer schema tree nodes and archive schema tree nodes
+     */
+    void parse_kv_log_event(
+            KeyValuePairLogEvent const& kv,
+            std::unordered_map<int32_t, std::vector<std::pair<NodeType, int32_t>>>&
+                    ir_node_to_archive_node_unordered_map
+    );
+
     /**
      * Parses an array within a JSON line
      * @param line the JSON array
diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp
index 5f4384a1c..727a80fd3 100644
--- a/components/core/src/clp_s/clp-s.cpp
+++ b/components/core/src/clp_s/clp-s.cpp
@@ -1,5 +1,6 @@
 #include <exception>
 #include <filesystem>
+#include <fstream>
 #include <iostream>
 #include <memory>
 #include <sstream>
@@ -11,6 +12,7 @@
 #include <spdlog/sinks/stdout_sinks.h>
 #include <spdlog/spdlog.h>
 
+#include "../clp/ffi/ir_stream/Serializer.hpp"
 #include "../clp/GlobalMySQLMetadataDB.hpp"
 #include "../clp/streaming_archive/ArchiveMetadata.hpp"
 #include "../reducer/network_utils.hpp"
@@ -36,6 +38,7 @@
 #include "Utils.hpp"
 
 using namespace clp_s::search;
+using clp::ffi::ir_stream::Serializer;
 using clp_s::cArchiveFormatDevelopmentVersionFlag;
 using clp_s::cEpochTimeMax;
 using clp_s::cEpochTimeMin;
@@ -50,6 +53,54 @@ namespace {
  */
 bool compress(CommandLineArguments const& command_line_arguments);
 
+template <typename encoded_variable_t>
+auto flush_and_clear_serializer_buffer(
+        Serializer<encoded_variable_t>& serializer,
+        std::vector<int8_t>& byte_buf
+) -> void;
+
+template <typename encoded_variable_t>
+auto unpack_and_serialize_msgpack_bytes(
+        std::vector<uint8_t> const& msgpack_bytes,
+        Serializer<encoded_variable_t>& serializer
+) -> bool;
+
+/**
+ * Given user specified options and a file path to a JSON file calls the serailizer one each JSON
+ * entry to serialize into IR
+ * @param option
+ * @param path
+ * @return Whether serialization was successful
+ */
+template <typename T>
+auto run_serializer(clp_s::JsonToIrParserOption const& option, std::string path);
+
+/**
+ * Iterates over the input JSON files specified by the command line arguments to generate and IR
+ * file for each one.
+ * @param command_line_arguments
+ * @return Whether generation was successful
+ */
+auto generate_ir(CommandLineArguments const& command_line_arguments) -> bool;
+
+/**
+ * Fill in JsonParserOption instance based on command line user input
+ * @param command_line_arguments
+ * @param option
+ * @return Whether setup was succesful
+ */
+auto setup_compression_options(
+        CommandLineArguments const& command_line_arguments,
+        clp_s::JsonParserOption& option
+) -> bool;
+
+/**
+ * Compresses the input IR files specified by the command line arguments into an archive.
+ * @param command_line_arguments
+ * @return Whether compression was successful
+ */
+auto ir_compress(CommandLineArguments const& command_line_arguments) -> bool;
+
 /**
  * Decompresses the archive specified by the given JsonConstructorOption.
  * @param json_constructor_option
@@ -119,6 +170,215 @@ bool compress(CommandLineArguments const& command_line_arguments) {
     return true;
 }
 
+template <typename encoded_variable_t>
+auto flush_and_clear_serializer_buffer(
+        Serializer<encoded_variable_t>& serializer,
+        std::vector<int8_t>& byte_buf
+) -> void {
+    auto const view{serializer.get_ir_buf_view()};
+    byte_buf.insert(byte_buf.cend(), view.begin(), view.end());
+    serializer.clear_ir_buf();
+}
+
+template <typename encoded_variable_t>
+auto unpack_and_serialize_msgpack_bytes(
+        std::vector<uint8_t> const& msgpack_bytes,
+        Serializer<encoded_variable_t>& serializer
+) -> bool {
+    try {
+        auto const msgpack_obj_handle{msgpack::unpack(
+                clp::size_checked_pointer_cast<char const>(msgpack_bytes.data()),
+                msgpack_bytes.size()
+        )};
+        auto const msgpack_obj{msgpack_obj_handle.get()};
+        if (msgpack::type::MAP != msgpack_obj.type) {
+            return false;
+        }
+        return serializer.serialize_msgpack_map(msgpack_obj.via.map);
+    } catch (std::exception const& e) {
+        SPDLOG_ERROR("Failed to unpack msgpack bytes: {}", e.what());
+        return false;
+    }
+}
+
+template <typename T>
+auto run_serializer(clp_s::JsonToIrParserOption const& option, std::string path) {
+    auto result{Serializer<T>::create()};
+    if (result.has_error()) {
+        SPDLOG_ERROR("Failed to create Serializer");
+        return false;
+    }
+    auto& serializer{result.value()};
+    std::vector<int8_t> ir_buf;
+    flush_and_clear_serializer_buffer(serializer, ir_buf);
+
+    std::ifstream in_file;
+    in_file.open(path, std::ifstream::in);
+
+    std::filesystem::path input_path{path};
+    std::string filename = input_path.filename().string();
+    std::string out_path = option.irs_dir + "/" + filename + ".ir";
+
+    clp_s::FileWriter out_file;
+    out_file.open(out_path, clp_s::FileWriter::OpenMode::CreateForWriting);
+    clp_s::ZstdCompressor zc;
+    try {
+        zc.open(out_file, option.compression_level);
+    } catch (clp_s::ZstdCompressor::OperationFailed& error) {
+        SPDLOG_ERROR("Failed to open ZSTDcompressor - {}", error.what());
+        in_file.close();
+        out_file.close();
+        return false;
+    }
+
+    std::string line = "";
+    size_t total_size = 0;
+
+    if (in_file.is_open()) {
+        while (getline(in_file, line)) {
+            try {
+                auto j_obj = nlohmann::json::parse(line);
+                if (false
+                    == unpack_and_serialize_msgpack_bytes(
+                            nlohmann::json::to_msgpack(j_obj),
+                            serializer
+                    ))
+                {
+                    SPDLOG_ERROR("Failed to serialize msgpack bytes for line: {}", line);
+                    in_file.close();
+                    out_file.close();
+                    zc.close();
+                    return false;
+                }
+                flush_and_clear_serializer_buffer(serializer, ir_buf);
+                if (ir_buf.size() >= option.max_ir_buffer_size) {
+                    total_size = total_size + ir_buf.size();
+                    zc.write(reinterpret_cast<char*>(ir_buf.data()), ir_buf.size());
+                    zc.flush();
+                    ir_buf.clear();
+                }
+            } catch (nlohmann::json::parse_error const& e) {
+                SPDLOG_ERROR("JSON parsing error: {}", e.what());
+                in_file.close();
+                out_file.close();
+                zc.close();
+                return false;
+            } catch (std::exception const& e) {
+                SPDLOG_ERROR("Error during serialization: {}", e.what());
+                in_file.close();
+                out_file.close();
+                zc.close();
+                return false;
+            }
+        }
+        total_size = total_size + ir_buf.size();
+        zc.write(reinterpret_cast<char*>(ir_buf.data()), ir_buf.size());
+        zc.flush();
+        ir_buf.clear();
+        in_file.close();
+        zc.close();
+        out_file.close();
+    }
+
+    return true;
+}
+
+auto generate_ir(CommandLineArguments const& command_line_arguments) -> bool {
+    auto irs_dir = std::filesystem::path(command_line_arguments.get_archives_dir());
+
+    // Create output directory in case it doesn't exist
+    try {
+        std::filesystem::create_directory(irs_dir.string());
+    } catch (std::exception& e) {
+        SPDLOG_ERROR("Failed to create archives directory {} - {}", irs_dir.string(), e.what());
+        return false;
+    }
+    clp_s::JsonToIrParserOption option{};
+    option.file_paths = command_line_arguments.get_file_paths();
+    option.irs_dir = irs_dir.string();
+    option.max_document_size = command_line_arguments.get_max_document_size();
+    option.max_ir_buffer_size = command_line_arguments.get_max_ir_buffer_size();
+    option.compression_level = command_line_arguments.get_compression_level();
+    option.encoding = command_line_arguments.get_encoding_type();
+
+    if (false == clp_s::FileUtils::validate_path(option.file_paths)) {
+        SPDLOG_ERROR("Invalid file path(s) provided");
+        return false;
+    }
+
+    std::vector<std::string> all_file_paths;
+    for (auto& file_path : option.file_paths) {
+        clp_s::FileUtils::find_all_files(file_path, all_file_paths);
+    }
+
+    for (auto& path : all_file_paths) {
+        bool success;
+        if (option.encoding == 4) {
+            success = run_serializer<int32_t>(option, path);
+        } else {
+            success = run_serializer<int64_t>(option, path);
+        }
+        if (false == success) {
+            return false;
+        }
+    }
+    return true;
+}
+
+auto setup_compression_options(
+        CommandLineArguments const& command_line_arguments,
+        clp_s::JsonParserOption& option
+) -> bool {
+    auto archives_dir = std::filesystem::path(command_line_arguments.get_archives_dir());
+    // Create output directory in case it doesn't exist
+    try {
+        std::filesystem::create_directory(archives_dir.string());
+    } catch (std::exception& e) {
+        SPDLOG_ERROR(
+                "Failed to create archives directory {} - {}",
+                archives_dir.string(),
+                e.what()
+        );
+        return false;
+    }
+    option.file_paths = command_line_arguments.get_file_paths();
+    option.archives_dir = archives_dir.string();
+    option.target_encoded_size = command_line_arguments.get_target_encoded_size();
+    option.max_document_size = command_line_arguments.get_max_document_size();
+    option.compression_level = command_line_arguments.get_compression_level();
+    option.timestamp_key = command_line_arguments.get_timestamp_key();
+    option.print_archive_stats = command_line_arguments.print_archive_stats();
+
+    auto const& db_config_container = command_line_arguments.get_metadata_db_config();
+    if (db_config_container.has_value()) {
+        auto const& db_config = db_config_container.value();
+        option.metadata_db = std::make_shared<clp::GlobalMySQLMetadataDB>(
+                db_config.get_metadata_db_host(),
+                db_config.get_metadata_db_port(),
+                db_config.get_metadata_db_username(),
+                db_config.get_metadata_db_password(),
+                db_config.get_metadata_db_name(),
+                db_config.get_metadata_table_prefix()
+        );
+    }
+    return true;
+}
+
+auto ir_compress(CommandLineArguments const& command_line_arguments) -> bool {
+    clp_s::JsonParserOption option{};
+    if (false == setup_compression_options(command_line_arguments, option)) {
+        return false;
+    }
+
+    clp_s::JsonParser parser(option);
+    if (false == parser.parse_from_ir()) {
+        SPDLOG_ERROR("Encountered error while parsing input");
+        return false;
+    }
+    parser.store();
+    return true;
+}
+
 void decompress_archive(clp_s::JsonConstructorOption const& json_constructor_option) {
     clp_s::JsonConstructor constructor(json_constructor_option);
     constructor.store();
@@ -285,6 +545,14 @@ int main(int argc, char const* argv[]) {
         if (false == compress(command_line_arguments)) {
             return 1;
         }
+    } else if (CommandLineArguments::Command::IrCompress == command_line_arguments.get_command()) {
+        if (false == ir_compress(command_line_arguments)) {
+            return 1;
+        }
+    } else if (CommandLineArguments::Command::JsonToIr == command_line_arguments.get_command()) {
+        if (false == generate_ir(command_line_arguments)) {
+            return 1;
+        }
     } else if (CommandLineArguments::Command::Extract == command_line_arguments.get_command()) {
         auto const& archives_dir = command_line_arguments.get_archives_dir();
         if (false == std::filesystem::is_directory(archives_dir)) {