Skip to content

Commit

Permalink
Make quantity field human readable
Browse files Browse the repository at this point in the history
Add support for human readable quantity field in the different json config, i.e, 10KB -> 10000.
This implementation still allows for the field being pure unsigned integers, so, it can be seen as offering an alternative configuration.
  • Loading branch information
matdexir committed May 19, 2024
1 parent 0a33c83 commit 0611fcc
Show file tree
Hide file tree
Showing 9 changed files with 495 additions and 62 deletions.
340 changes: 299 additions & 41 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions nativelink-config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load(
"rust_doc",
"rust_doc_test",
"rust_library",
"rust_test_suite",
)

rust_library(
Expand All @@ -16,11 +17,29 @@ rust_library(
],
visibility = ["//visibility:public"],
deps = [
"@crates//:byte-unit",
"@crates//:humantime",
"@crates//:serde",
"@crates//:shellexpand",
],
)

rust_test_suite(
name = "integration",
timeout = "short",
srcs = [
"tests/deserialization_test.rs",
],
deps = [
"//nativelink-config",
"//nativelink-error",
"@crates//:byte-unit",
"@crates//:humantime",
"@crates//:serde",
"@crates//:serde_json5",
],
)

rust_doc(
name = "docs",
crate = ":nativelink-config",
Expand Down
5 changes: 4 additions & 1 deletion nativelink-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ version = "0.4.0"
edition = "2021"

[dependencies]
serde = { version = "1.0.201", features = ["derive"] }
byte-unit = "5.1.4"
humantime = "2.1.0"
serde = { version = "1.0.198", features = ["derive"] }
serde_json5 = "0.1.0"
shellexpand = "3.1.0"
11 changes: 6 additions & 5 deletions nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use serde::Deserialize;

use crate::schedulers::SchedulerConfig;
use crate::serde_utils::{
convert_data_size_with_shellexpand, convert_duration_with_shellexpand,
convert_numeric_with_shellexpand, convert_optional_numeric_with_shellexpand,
convert_optional_string_with_shellexpand, convert_string_with_shellexpand,
convert_vec_string_with_shellexpand,
Expand Down Expand Up @@ -139,7 +140,7 @@ pub struct ByteStreamConfig {
/// 16KiB - 64KiB is optimal.
///
/// Defaults: 64KiB
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub max_bytes_per_stream: usize,

/// In the event a client disconnects while uploading a blob, we will hold
Expand All @@ -148,7 +149,7 @@ pub struct ByteStreamConfig {
/// the same blob.
///
/// Defaults: 10 (seconds)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub persist_stream_on_disconnect_timeout: usize,
}

Expand Down Expand Up @@ -557,7 +558,7 @@ pub struct LocalWorkerConfig {
/// longer than this time limit, the task will be rejected. Value in seconds.
///
/// Default: 1200 (seconds / 20 mins)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub max_action_timeout: usize,

/// If timeout is handled in `entrypoint` or another wrapper script.
Expand Down Expand Up @@ -667,7 +668,7 @@ pub struct GlobalConfig {
/// a new file descriptor because the limit has been reached.
///
/// Default: 1000 (1 second)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub idle_file_descriptor_timeout_millis: u64,

/// This flag can be used to prevent metrics from being collected at runtime.
Expand Down Expand Up @@ -695,7 +696,7 @@ pub struct GlobalConfig {
/// digest.
///
/// Default: 1024*1024 (1MiB)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub default_digest_size_health_check: usize,
}

Expand Down
2 changes: 1 addition & 1 deletion nativelink-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@

pub mod cas_server;
pub mod schedulers;
mod serde_utils;
pub mod serde_utils;
pub mod stores;
6 changes: 3 additions & 3 deletions nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;

use serde::Deserialize;

use crate::serde_utils::convert_numeric_with_shellexpand;
use crate::serde_utils::{convert_duration_with_shellexpand, convert_numeric_with_shellexpand};
use crate::stores::{GrpcEndpoint, Retry, StoreRefName};

#[allow(non_camel_case_types)]
Expand Down Expand Up @@ -97,13 +97,13 @@ pub struct SimpleScheduler {
/// The amount of time to retain completed actions in memory for in case
/// a WaitExecution is called after the action has completed.
/// Default: 60 (seconds)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub retain_completed_for_s: u64,

/// Remove workers from pool once the worker has not responded in this
/// amount of time in seconds.
/// Default: 5 (seconds)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub worker_timeout_s: u64,

/// If a job returns an internal error or times out this many times when
Expand Down
80 changes: 80 additions & 0 deletions nativelink-config/src/serde_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::fmt;
use std::marker::PhantomData;
use std::str::FromStr;

use byte_unit::Byte;
use humantime::parse_duration;
use serde::{de, Deserialize, Deserializer};

/// Helper for serde macro so you can use shellexpand variables in the json configuration
Expand Down Expand Up @@ -138,3 +140,81 @@ pub fn convert_optional_string_with_shellexpand<'de, D: Deserializer<'de>>(
Ok(None)
}
}

pub fn convert_data_size_with_shellexpand<'de, D, T, E>(deserializer: D) -> Result<T, D::Error>
where
D: Deserializer<'de>,
E: fmt::Display,
T: TryFrom<i64> + FromStr<Err = E>,
<T as TryFrom<i64>>::Error: fmt::Display,
{
// define a visitor that deserializes
// `ActualData` encoded as json within a string
struct USizeVisitor<T: TryFrom<i64>>(PhantomData<T>);

impl<'de, T, FromStrErr> de::Visitor<'de> for USizeVisitor<T>
where
FromStrErr: fmt::Display,
T: TryFrom<i64> + FromStr<Err = FromStrErr>,
<T as TryFrom<i64>>::Error: fmt::Display,
{
type Value = T;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string containing json data")
}

fn visit_i64<E: de::Error>(self, v: i64) -> Result<Self::Value, E> {
v.try_into().map_err(de::Error::custom)
}

fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
let expanded = (*shellexpand::env(v).map_err(de::Error::custom)?).to_string();
let byte_size = Byte::parse_str(expanded, true).map_err(de::Error::custom)?;
let byte_size_u128 = byte_size.as_u128();
T::try_from(byte_size_u128.try_into().map_err(de::Error::custom)?)
.map_err(de::Error::custom)
}
}

deserializer.deserialize_any(USizeVisitor::<T>(PhantomData::<T> {}))
}

pub fn convert_duration_with_shellexpand<'de, D, T, E>(deserializer: D) -> Result<T, D::Error>
where
D: Deserializer<'de>,
E: fmt::Display,
T: TryFrom<i64> + FromStr<Err = E>,
<T as TryFrom<i64>>::Error: fmt::Display,
{
// define a visitor that deserializes
// `ActualData` encoded as json within a string
struct USizeVisitor<T: TryFrom<i64>>(PhantomData<T>);

impl<'de, T, FromStrErr> de::Visitor<'de> for USizeVisitor<T>
where
FromStrErr: fmt::Display,
T: TryFrom<i64> + FromStr<Err = FromStrErr>,
<T as TryFrom<i64>>::Error: fmt::Display,
{
type Value = T;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string containing json data")
}

fn visit_i64<E: de::Error>(self, v: i64) -> Result<Self::Value, E> {
v.try_into().map_err(de::Error::custom)
}

fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
let expanded = (*shellexpand::env(v).map_err(de::Error::custom)?).to_string();
let duration = parse_duration(&expanded).map_err(de::Error::custom)?;
let duration_secs = duration.as_secs();
T::try_from(duration_secs.try_into().map_err(de::Error::custom)?)
.map_err(de::Error::custom)
}
}

deserializer.deserialize_any(USizeVisitor::<T>(PhantomData::<T> {}))
}
23 changes: 12 additions & 11 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use serde::{Deserialize, Serialize};

use crate::serde_utils::{
convert_data_size_with_shellexpand, convert_duration_with_shellexpand,
convert_numeric_with_shellexpand, convert_optional_string_with_shellexpand,
convert_string_with_shellexpand, convert_vec_string_with_shellexpand,
};
Expand Down Expand Up @@ -203,7 +204,7 @@ pub struct ShardStore {
#[serde(deny_unknown_fields)]
pub struct SizePartitioningStore {
/// Size to partition the data on.
#[serde(deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(deserialize_with = "convert_data_size_with_shellexpand")]
pub size: u64,

/// Store to send data when object is < (less than) size.
Expand Down Expand Up @@ -243,7 +244,7 @@ pub struct FilesystemStore {
/// Buffer size to use when reading files. Generally this should be left
/// to the default value except for testing.
/// Default: 32k.
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub read_buffer_size: u32,

/// Policy used to evict items out of the store. Failure to set this
Expand All @@ -255,7 +256,7 @@ pub struct FilesystemStore {
/// value is used to determine an entry's actual size on disk consumed
/// For a 4KB block size filesystem, a 1B file actually consumes 4KB
/// Default: 4096
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub block_size: u64,
}

Expand Down Expand Up @@ -297,7 +298,7 @@ pub struct DedupStore {
/// deciding where to partition the data.
///
/// Default: 65536 (64k)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub min_size: u32,

/// A best-effort attempt will be made to keep the average size
Expand All @@ -311,13 +312,13 @@ pub struct DedupStore {
/// details.
///
/// Default: 262144 (256k)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub normal_size: u32,

/// Maximum size a chunk is allowed to be.
///
/// Default: 524288 (512k)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub max_size: u32,

/// Due to implementation detail, we want to prefer to download
Expand Down Expand Up @@ -396,7 +397,7 @@ pub struct Lz4Config {
/// compression ratios.
///
/// Default: 65536 (64k).
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub block_size: u32,

/// Maximum size allowed to attempt to deserialize data into.
Expand All @@ -407,7 +408,7 @@ pub struct Lz4Config {
/// allow you to specify the maximum that we'll attempt deserialize.
///
/// Default: value in `block_size`.
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub max_decode_block_size: u32,
}

Expand Down Expand Up @@ -447,19 +448,19 @@ pub struct CompressionStore {
pub struct EvictionPolicy {
/// Maximum number of bytes before eviction takes place.
/// Default: 0. Zero means never evict based on size.
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub max_bytes: usize,

/// When eviction starts based on hitting max_bytes, continue until
/// max_bytes - evict_bytes is met to create a low watermark. This stops
/// operations from thrashing when the store is close to the limit.
/// Default: 0
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub evict_bytes: usize,

/// Maximum number of seconds for an entry to live before an eviction.
/// Default: 0. Zero means never evict based on time.
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub max_seconds: u32,

/// Maximum size of the store before an eviction takes place.
Expand Down
71 changes: 71 additions & 0 deletions nativelink-config/tests/deserialization_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2023 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use nativelink_config::serde_utils::{
convert_data_size_with_shellexpand, convert_duration_with_shellexpand,
};
use serde::Deserialize;

#[derive(Deserialize)]
struct DurationEntity {
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
duration: usize,
}

#[derive(Deserialize)]
struct DataSizeEntity {
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
data_size: usize,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_duration_human_readable_deserialize() {
let example = r#"
{"duration": "1m 10s"}
"#;
let deserialized: DurationEntity = serde_json5::from_str(example).unwrap();
assert_eq!(deserialized.duration, 70);
}

#[test]
fn test_duration_usize_deserialize() {
let example = r#"
{"duration": 10}
"#;
let deserialized: DurationEntity = serde_json5::from_str(example).unwrap();
assert_eq!(deserialized.duration, 10);
}

#[test]
fn test_data_size_unit_deserialize() {
let example = r#"
{"data_size": "1KiB"}
"#;
let deserialized: DataSizeEntity = serde_json5::from_str(example).unwrap();
assert_eq!(deserialized.data_size, 1024);
}

#[test]
fn test_data_size_usize_deserialize() {
let example = r#"
{"data_size": 10}
"#;
let deserialized: DataSizeEntity = serde_json5::from_str(example).unwrap();
assert_eq!(deserialized.data_size, 10);
}
}

0 comments on commit 0611fcc

Please sign in to comment.