Skip to content

Commit

Permalink
feat(clp-package): Add support for uploading extracted streams to S3. (
Browse files Browse the repository at this point in the history
…#662)

Co-authored-by: Devin Gibson <gibber9809@users.noreply.github.com>
Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com>
3 people authored Jan 17, 2025
1 parent a5268d6 commit 5c96bdc
Showing 16 changed files with 229 additions and 78 deletions.
13 changes: 6 additions & 7 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
StorageType,
WEBUI_COMPONENT_NAME,
WorkerConfig,
)
@@ -254,17 +253,17 @@ def generate_container_config(
container_clp_config.archive_output.get_directory(),
)

container_clp_config.stream_output.directory = pathlib.Path("/") / "mnt" / "stream-output"
container_clp_config.stream_output.set_directory(pathlib.Path("/") / "mnt" / "stream-output")
if not is_path_already_mounted(
clp_home,
CONTAINER_CLP_HOME,
clp_config.stream_output.directory,
container_clp_config.stream_output.directory,
clp_config.stream_output.get_directory(),
container_clp_config.stream_output.get_directory(),
):
docker_mounts.stream_output_dir = DockerMount(
DockerMountType.BIND,
clp_config.stream_output.directory,
container_clp_config.stream_output.directory,
clp_config.stream_output.get_directory(),
container_clp_config.stream_output.get_directory(),
)

return container_clp_config, docker_mounts
@@ -276,7 +275,7 @@ def generate_worker_config(clp_config: CLPConfig) -> WorkerConfig:
worker_config.archive_output = clp_config.archive_output.copy(deep=True)
worker_config.data_directory = clp_config.data_directory

worker_config.stream_output_dir = clp_config.stream_output.directory
worker_config.stream_output = clp_config.stream_output
worker_config.stream_collection_name = clp_config.results_cache.stream_collection_name

return worker_config
Original file line number Diff line number Diff line change
@@ -704,9 +704,10 @@ def generic_start_worker(

# Create necessary directories
clp_config.archive_output.get_directory().mkdir(parents=True, exist_ok=True)
clp_config.stream_output.directory.mkdir(parents=True, exist_ok=True)
clp_config.stream_output.get_directory().mkdir(parents=True, exist_ok=True)

clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"
container_worker_log_path = container_logs_dir / "worker.log"
# fmt: off
container_start_cmd = [
"docker", "run",
@@ -729,6 +730,7 @@ def generic_start_worker(
"-e", f"CLP_CONFIG_PATH={container_clp_config.logs_directory / container_config_filename}",
"-e", f"CLP_LOGS_DIR={container_logs_dir}",
"-e", f"CLP_LOGGING_LEVEL={worker_config.logging_level}",
"-e", f"CLP_WORKER_LOG_PATH={container_worker_log_path}",
"-u", f"{os.getuid()}:{os.getgid()}",
]
# fmt: on
@@ -760,7 +762,7 @@ def generic_start_worker(
"--loglevel",
"WARNING",
"-f",
str(container_logs_dir / "worker.log"),
str(container_worker_log_path),
"-Q",
celery_route,
"-n",
@@ -922,7 +924,7 @@ def start_log_viewer_webui(
"MongoDbName": clp_config.results_cache.db_name,
"MongoDbStreamFilesCollectionName": clp_config.results_cache.stream_collection_name,
"ClientDir": str(container_log_viewer_webui_dir / "client"),
"StreamFilesDir": str(container_clp_config.stream_output.directory),
"StreamFilesDir": str(container_clp_config.stream_output.get_directory()),
"StreamTargetUncompressedSize": container_clp_config.stream_output.target_uncompressed_size,
"LogViewerDir": str(container_log_viewer_webui_dir / "yscope-log-viewer"),
}
119 changes: 80 additions & 39 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pathlib
from enum import auto
from typing import Literal, Optional, Union
from typing import Literal, Optional, Tuple, Union

from dotenv import dotenv_values
from pydantic import BaseModel, PrivateAttr, validator
@@ -40,6 +40,7 @@
OS_RELEASE_FILE_PATH = pathlib.Path("etc") / "os-release"

CLP_DEFAULT_CREDENTIALS_FILE_PATH = pathlib.Path("etc") / "credentials.yml"
CLP_DEFAULT_DATA_DIRECTORY_PATH = pathlib.Path("var") / "data"
CLP_METADATA_TABLE_PREFIX = "clp_"


@@ -309,13 +310,29 @@ class Queue(BaseModel):
password: Optional[str]


class S3Credentials(BaseModel):
access_key_id: str
secret_access_key: str

@validator("access_key_id")
def validate_access_key_id(cls, field):
if field == "":
raise ValueError("access_key_id cannot be empty")
return field

@validator("secret_access_key")
def validate_secret_access_key(cls, field):
if field == "":
raise ValueError("secret_access_key cannot be empty")
return field


class S3Config(BaseModel):
region_code: str
bucket: str
key_prefix: str

access_key_id: Optional[str] = None
secret_access_key: Optional[str] = None
credentials: Optional[S3Credentials]

@validator("region_code")
def validate_region_code(cls, field):
@@ -337,10 +354,15 @@ def validate_key_prefix(cls, field):
raise ValueError('key_prefix must end with "/"')
return field

def get_credentials(self) -> Tuple[Optional[str], Optional[str]]:
if self.credentials is None:
return None, None
return self.credentials.access_key_id, self.credentials.secret_access_key


class FsStorage(BaseModel):
type: Literal[StorageType.FS.value] = StorageType.FS.value
directory: pathlib.Path = pathlib.Path("var") / "data" / "archives"
directory: pathlib.Path

@validator("directory")
def validate_directory(cls, field):
@@ -359,7 +381,7 @@ def dump_to_primitive_dict(self):

class S3Storage(BaseModel):
type: Literal[StorageType.S3.value] = StorageType.S3.value
staging_directory: pathlib.Path = pathlib.Path("var") / "data" / "staged_archives"
staging_directory: pathlib.Path
s3_config: S3Config

@validator("staging_directory")
@@ -377,8 +399,46 @@ def dump_to_primitive_dict(self):
return d


class ArchiveFsStorage(FsStorage):
directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "archives"


class StreamFsStorage(FsStorage):
directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "streams"


class ArchiveS3Storage(S3Storage):
staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-archives"


class StreamS3Storage(S3Storage):
staging_directory: pathlib.Path = CLP_DEFAULT_DATA_DIRECTORY_PATH / "staged-streams"


def _get_directory_from_storage_config(storage_config: Union[FsStorage, S3Storage]) -> pathlib.Path:
storage_type = storage_config.type
if StorageType.FS == storage_type:
return storage_config.directory
elif StorageType.S3 == storage_type:
return storage_config.staging_directory
else:
raise NotImplementedError(f"storage.type {storage_type} is not supported")


def _set_directory_for_storage_config(
storage_config: Union[FsStorage, S3Storage], directory
) -> None:
storage_type = storage_config.type
if StorageType.FS == storage_type:
storage_config.directory = directory
elif StorageType.S3 == storage_type:
storage_config.staging_directory = directory
else:
raise NotImplementedError(f"storage.type {storage_type} is not supported")


class ArchiveOutput(BaseModel):
storage: Union[FsStorage, S3Storage] = FsStorage()
storage: Union[ArchiveFsStorage, ArchiveS3Storage] = ArchiveFsStorage()
target_archive_size: int = 256 * 1024 * 1024 # 256 MB
target_dictionaries_size: int = 32 * 1024 * 1024 # 32 MB
target_encoded_file_size: int = 256 * 1024 * 1024 # 256 MB
@@ -409,55 +469,36 @@ def validate_target_segment_size(cls, field):
return field

def set_directory(self, directory: pathlib.Path):
storage_config = self.storage
storage_type = storage_config.type
if StorageType.FS == storage_type:
storage_config.directory = directory
elif StorageType.S3 == storage_type:
storage_config.staging_directory = directory
else:
raise NotImplementedError(f"storage.type {storage_type} is not supported")
_set_directory_for_storage_config(self.storage, directory)

def get_directory(self) -> pathlib.Path:
storage_config = self.storage
storage_type = storage_config.type
if StorageType.FS == storage_config.type:
return storage_config.directory
elif StorageType.S3 == storage_type:
return storage_config.staging_directory
else:
raise NotImplementedError(f"storage.type {storage_type} is not supported")
return _get_directory_from_storage_config(self.storage)

def dump_to_primitive_dict(self):
d = self.dict()
# Turn directory (pathlib.Path) into a primitive string
d["storage"] = self.storage.dump_to_primitive_dict()
return d


class StreamOutput(BaseModel):
directory: pathlib.Path = pathlib.Path("var") / "data" / "streams"
storage: Union[StreamFsStorage, StreamS3Storage] = StreamFsStorage()
target_uncompressed_size: int = 128 * 1024 * 1024

@validator("directory")
def validate_directory(cls, field):
if "" == field:
raise ValueError("directory cannot be empty")
return field

@validator("target_uncompressed_size")
def validate_target_uncompressed_size(cls, field):
if field <= 0:
raise ValueError("target_uncompressed_size must be greater than 0")
return field

def make_config_paths_absolute(self, clp_home: pathlib.Path):
self.directory = make_config_path_absolute(clp_home, self.directory)
def set_directory(self, directory: pathlib.Path):
_set_directory_for_storage_config(self.storage, directory)

def get_directory(self) -> pathlib.Path:
return _get_directory_from_storage_config(self.storage)

def dump_to_primitive_dict(self):
d = self.dict()
# Turn directory (pathlib.Path) into a primitive string
d["directory"] = str(d["directory"])
d["storage"] = self.storage.dump_to_primitive_dict()
return d


@@ -527,7 +568,7 @@ def make_config_paths_absolute(self, clp_home: pathlib.Path):
self.input_logs_directory = make_config_path_absolute(clp_home, self.input_logs_directory)
self.credentials_file_path = make_config_path_absolute(clp_home, self.credentials_file_path)
self.archive_output.storage.make_config_paths_absolute(clp_home)
self.stream_output.make_config_paths_absolute(clp_home)
self.stream_output.storage.make_config_paths_absolute(clp_home)
self.data_directory = make_config_path_absolute(clp_home, self.data_directory)
self.logs_directory = make_config_path_absolute(clp_home, self.logs_directory)
self._os_release_file_path = make_config_path_absolute(clp_home, self._os_release_file_path)
@@ -557,9 +598,9 @@ def validate_archive_output_config(self):

def validate_stream_output_dir(self):
try:
validate_path_could_be_dir(self.stream_output.directory)
validate_path_could_be_dir(self.stream_output.get_directory())
except ValueError as ex:
raise ValueError(f"stream_output.directory is invalid: {ex}")
raise ValueError(f"stream_output.storage's directory is invalid: {ex}")

def validate_data_dir(self):
try:
@@ -643,7 +684,7 @@ class WorkerConfig(BaseModel):
data_directory: pathlib.Path = CLPConfig().data_directory

# Only needed by query workers.
stream_output_dir: pathlib.Path = StreamOutput().directory
stream_output: StreamOutput = StreamOutput()
stream_collection_name: str = ResultsCache().stream_collection_name

def dump_to_primitive_dict(self):
@@ -652,6 +693,6 @@ def dump_to_primitive_dict(self):

# Turn paths into primitive strings
d["data_directory"] = str(self.data_directory)
d["stream_output_dir"] = str(self.stream_output_dir)
d["stream_output"] = self.stream_output.dump_to_primitive_dict()

return d
5 changes: 3 additions & 2 deletions components/clp-py-utils/clp_py_utils/s3_utils.py
Original file line number Diff line number Diff line change
@@ -124,12 +124,13 @@ def s3_put(
)

config = Config(retries=dict(total_max_attempts=total_max_attempts, mode="adaptive"))
aws_access_key_id, aws_secret_access_key = s3_config.get_credentials()

my_s3_client = boto3.client(
"s3",
region_name=s3_config.region_code,
aws_access_key_id=s3_config.access_key_id,
aws_secret_access_key=s3_config.secret_access_key,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
config=config,
)

17 changes: 11 additions & 6 deletions components/core/src/clp/clo/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
@@ -179,12 +179,17 @@ auto CommandLineArguments::parse_ir_extraction_arguments(
// Define IR extraction options
po::options_description options_ir_extraction("IR Extraction Options");
// clang-format off
options_ir_extraction
.add_options()(
"target-size",
po::value<size_t>(&m_ir_target_size)->value_name("SIZE"),
"Target size (B) for each IR chunk before a new chunk is created"
);
options_ir_extraction.add_options()(
"target-size",
po::value<size_t>(&m_ir_target_size)
->value_name("SIZE")
->default_value(m_ir_target_size),
"Target size (B) for each IR chunk before a new chunk is created"
)(
"print-ir-stats",
po::bool_switch(&m_print_ir_stats),
"Print statistics (ndjson) about each IR file after it's extracted"
);
// clang-format on

// Define visible options
3 changes: 3 additions & 0 deletions components/core/src/clp/clo/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
@@ -48,6 +48,8 @@ class CommandLineArguments : public CommandLineArgumentsBase {
[[nodiscard]] auto get_archive_path() const -> std::string_view { return m_archive_path; }

// IR extraction arguments
[[nodiscard]] auto print_ir_stats() const -> bool { return m_print_ir_stats; }

[[nodiscard]] auto get_file_split_id() const -> std::string const& { return m_file_split_id; }

[[nodiscard]] size_t get_ir_target_size() const { return m_ir_target_size; }
@@ -180,6 +182,7 @@ class CommandLineArguments : public CommandLineArgumentsBase {
std::string m_archive_path;

// Variables for IR extraction
bool m_print_ir_stats{false};
std::string m_file_split_id;
size_t m_ir_target_size{128ULL * 1024 * 1024};
std::string m_ir_output_dir;
9 changes: 9 additions & 0 deletions components/core/src/clp/clo/clo.cpp
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
#include <memory>
#include <string>

#include <json/single_include/nlohmann/json.hpp>
#include <mongocxx/instance.hpp>
#include <spdlog/sinks/stdout_sinks.h>

@@ -211,6 +212,14 @@ bool extract_ir(CommandLineArguments const& command_line_args) {
is_last_chunk
)
)));

if (command_line_args.print_ir_stats()) {
nlohmann::json json_msg;
json_msg["path"] = dest_ir_path;
std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore)
<< std::endl;
}

return true;
};

Loading

0 comments on commit 5c96bdc

Please sign in to comment.