Skip to content

Commit

Permalink
Add Redis Store integration
Browse files Browse the repository at this point in the history
  • Loading branch information
blakehatch committed Mar 24, 2024
1 parent 682c4fe commit b79be6c
Show file tree
Hide file tree
Showing 13 changed files with 16,194 additions and 64 deletions.
15,601 changes: 15,601 additions & 0 deletions Cargo.Bazel.lock

Large diffs are not rendered by default.

197 changes: 133 additions & 64 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ pub enum StoreConfig {
/// side effects and is the most efficient way to use it.
grpc(GrpcStore),

/// Stores data in a Redis store or any stores compatible with Redis APIs.
///
/// This store pairs well with the SizePartitioning store to accept only
/// small data that is optimally sized to fit in the Redis store. As Redis
/// and other in-memory stores are generally fast as well it is also
/// a good pair with the FastSlow store.
redis_store(RedisStore),

/// Noop store is a store that sends streams into the void and all data
/// retrieval will return 404 (NotFound). This can be useful for cases
/// where you may need to partition your data and part of your data needs
Expand Down Expand Up @@ -584,6 +592,13 @@ pub enum ErrorCode {
// Note: This list is duplicated from nativelink-error/lib.rs.
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RedisStore {
/// The hostname or IP address of the Redis server can include username, passowrd, tls, and database number.
#[serde(default, deserialize_with = "convert_string_with_shellexpand")]
pub url: String,
}

/// Retry configuration. This configuration is exponential and each iteration
/// a jitter as a percentage is applied of the calculated delay. For example:
/// ```rust,ignore
Expand Down
2 changes: 2 additions & 0 deletions nativelink-error/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ rust_library(
visibility = ["//visibility:public"],
deps = [
"//nativelink-proto",
#"@crates//:futures-rustls",
"@crates//:hex",
"@crates//:prost",
"@crates//:prost-types",
"@crates//:redis",
"@crates//:tokio",
"@crates//:tonic",
],
Expand Down
1 change: 1 addition & 0 deletions nativelink-error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ nativelink-proto = { path = "../nativelink-proto" }
hex = "0.4.3"
prost = "0.12.3"
prost-types = "0.12.3"
redis = { version = "0.25.2", features = ["tokio-comp", "tokio-rustls-comp"] }
tokio = { version = "1.36.0" }
tonic = { version = "0.11.0", features = ["gzip"] }
6 changes: 6 additions & 0 deletions nativelink-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ impl From<nativelink_proto::google::rpc::Status> for Error {
}
}

impl From<redis::RedisError> for Error {
fn from(err: redis::RedisError) -> Self {
Error::new(Code::Internal, format!("Redis error: {}", err))
}
}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// A manual impl to reduce the noise of frequently empty fields.
Expand Down
9 changes: 9 additions & 0 deletions nativelink-store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ rust_library(
"src/memory_store.rs",
"src/noop_store.rs",
"src/ref_store.rs",
"src/mock_redis.rs",
"src/redis_store.rs",
"src/s3_store.rs",
"src/shard_store.rs",
"src/size_partitioning_store.rs",
Expand All @@ -48,13 +50,16 @@ rust_library(
"@crates//:bytes",
"@crates//:filetime",
"@crates//:futures",
#"@crates//:futures-rustls",
"@crates//:hex",
"@crates//:hyper",
"@crates//:hyper-rustls",
"@crates//:lz4_flex",
"@crates//:parking_lot",
"@crates//:prost",
"@crates//:rand",
"@crates//:redis",
"@crates//:redis-test",
"@crates//:serde",
"@crates//:sha2",
"@crates//:shellexpand",
Expand Down Expand Up @@ -85,6 +90,7 @@ rust_test_suite(
"tests/shard_store_test.rs",
"tests/size_partitioning_store_test.rs",
"tests/verify_store_test.rs",
"tests/redis_store_test.rs"
],
proc_macro_deps = [
"@crates//:async-trait",
Expand All @@ -103,12 +109,15 @@ rust_test_suite(
"@crates//:bytes",
"@crates//:filetime",
"@crates//:futures",
#"@crates//:futures-rustls",
"@crates//:http",
"@crates//:hyper",
"@crates//:memory-stats",
"@crates//:once_cell",
"@crates//:pretty_assertions",
"@crates//:rand",
"@crates//:redis",
"@crates//:redis-test",
"@crates//:sha2",
"@crates//:tokio",
"@crates//:tokio-stream",
Expand Down
2 changes: 2 additions & 0 deletions nativelink-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ lz4_flex = "0.11.2"
parking_lot = "0.12.1"
prost = "0.12.3"
rand = "0.8.5"
redis = { version = "0.25.2", features = ["tokio-comp", "tokio-rustls-comp"] }
redis-test = { version = "0.4.0", features = ["aio"] }
serde = "1.0.197"
sha2 = "0.10.8"
shellexpand = "3.1.0"
Expand Down
5 changes: 5 additions & 0 deletions nativelink-store/src/default_store_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use nativelink_error::Error;
use nativelink_util::health_utils::HealthRegistryBuilder;
use nativelink_util::metrics_utils::Registry;
use nativelink_util::store_trait::Store;
use redis::aio::MultiplexedConnection;

use crate::completeness_checking_store::CompletenessCheckingStore;
use crate::compression_store::CompressionStore;
Expand All @@ -32,6 +33,7 @@ use crate::filesystem_store::FilesystemStore;
use crate::grpc_store::GrpcStore;
use crate::memory_store::MemoryStore;
use crate::noop_store::NoopStore;
use crate::redis_store::RedisStore;
use crate::ref_store::RefStore;
use crate::s3_store::S3Store;
use crate::shard_store::ShardStore;
Expand All @@ -51,6 +53,9 @@ pub fn store_factory<'a>(
let store: Arc<dyn Store> = match backend {
StoreConfig::memory(config) => Arc::new(MemoryStore::new(config)),
StoreConfig::experimental_s3_store(config) => Arc::new(S3Store::new(config).await?),
StoreConfig::redis_store(config) => {
Arc::new(RedisStore::<MultiplexedConnection>::new(config).await?)
}
StoreConfig::verify(config) => Arc::new(VerifyStore::new(
config,
store_factory(&config.backend, store_manager, None, None).await?,
Expand Down
2 changes: 2 additions & 0 deletions nativelink-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ pub mod fast_slow_store;
pub mod filesystem_store;
pub mod grpc_store;
pub mod memory_store;
mod mock_redis;
pub mod noop_store;
pub mod redis_store;
pub mod ref_store;
pub mod s3_store;
pub mod shard_store;
Expand Down
37 changes: 37 additions & 0 deletions nativelink-store/src/mock_redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2024 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use nativelink_error::{Code, Error};
use redis_test::MockRedisConnection;

use crate::redis_store::RedisConnectionTrait;

#[async_trait]
impl RedisConnectionTrait for MockRedisConnection {
type ConnType = MockRedisConnection;

async fn new_with_config(
_config: &nativelink_config::stores::RedisStore,
) -> Result<Self, Error> {
Err(Error::new(
Code::Unimplemented,
"new_with_connection method is not implemented".to_string(),
))
}

async fn new_with_connection(conn: Self::ConnType) -> Result<Self, Error> {
Ok(conn)
}
}
199 changes: 199 additions & 0 deletions nativelink-store/src/redis_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright 2024 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;
use std::pin::Pin;
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use futures::future::try_join_all;
use hex::encode;
use nativelink_error::{Code, Error, ResultExt};
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::common::DigestInfo;
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry};
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use redis::aio::{ConnectionLike, MultiplexedConnection};
use redis::AsyncCommands;
use tokio::sync::Mutex;

#[async_trait]
pub trait RedisConnectionTrait: Sync + Send + Sized + 'static {
type ConnType: Send + Sync + Sized + 'static;

async fn new_with_config(config: &nativelink_config::stores::RedisStore)
-> Result<Self, Error>;

async fn new_with_connection(conn: Self::ConnType) -> Result<Self, Error>;
}

#[async_trait]
impl RedisConnectionTrait for MultiplexedConnection {
type ConnType = MultiplexedConnection;

async fn new_with_config(
config: &nativelink_config::stores::RedisStore,
) -> Result<Self, Error> {
let client = redis::Client::open(config.url.clone());
let connection = client?.get_multiplexed_tokio_connection().await?;
Ok(connection)
}

async fn new_with_connection(_conn: Self::ConnType) -> Result<Self, Error> {
Err(Error::new(
Code::Unimplemented,
"new_with_connection method is not implemented".to_string(),
))
}
}

pub struct RedisStore<T: RedisConnectionTrait + 'static> {
pub conn: Arc<Mutex<T>>,
}

impl<T: RedisConnectionTrait + 'static> RedisStore<T> {
pub async fn new(config: &nativelink_config::stores::RedisStore) -> Result<Self, Error> {
let connection = T::new_with_config(config)
.await
.map(|conn| Arc::new(Mutex::new(conn)))?;

Ok(RedisStore { conn: connection })
}

pub async fn new_with_connection(conn: T) -> Result<Self, Error> {
Ok(RedisStore {
conn: Arc::new(Mutex::new(conn)),
})
}
}

#[async_trait]
impl<T: RedisConnectionTrait + ConnectionLike + 'static> Store for RedisStore<T> {
async fn has_with_results(
self: Pin<&Self>,
digests: &[DigestInfo],
results: &mut [Option<usize>],
) -> Result<(), Error> {
// TODO (Blake Hatch): Do not collect use buffer_unordered(SOME_N_LIMIT)
let futures: Vec<_> = digests
.iter()
.enumerate()
.map(|(index, digest)| async move {
let conn = Arc::clone(&self.conn);
let mut conn = conn.lock().await;
let packed_hash = encode(digest.packed_hash);
let exists: bool = conn.exists(&packed_hash).await?;
Ok::<(usize, _), Error>((index, exists))
})
.collect();

let results_vec: Vec<_> = try_join_all(futures).await?;

for (index, exists) in results_vec {
results[index] = Some(exists as usize);
}
Ok(())
}

async fn update(
self: Pin<&Self>,
digest: DigestInfo,
reader: DropCloserReadHalf,
size_info: UploadSizeInfo,
) -> Result<(), Error> {
let size = match size_info {
UploadSizeInfo::ExactSize(size) => size,
UploadSizeInfo::MaxSize(size) => size,
};

let buffer = reader
.collect_all_with_size_hint(size)
.await
.err_tip(|| "Failed to collect all bytes from reader in redis_store::update")?;

let conn = Arc::clone(&self.conn);
let mut conn = conn.lock().await;
let packed_hash = encode(digest.packed_hash);
let _: () = conn.set(&packed_hash, &buffer[..]).await?;
Ok(())
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
let conn = Arc::clone(&self.conn);
let mut conn = conn.lock().await;
let packed_hash = encode(digest.packed_hash);
let value = conn.get::<_, Vec<u8>>(&packed_hash).await?;
let default_len = value.len() - offset;
let bytes_wrapper = Bytes::from(value);
let length = length.unwrap_or(default_len).min(default_len);
if length > 0 {
writer
.send(bytes_wrapper.slice(offset..(offset + length)))
.await
.err_tip(|| "Failed to write data in Redis store")?;
}
writer
.send_eof()
.await
.err_tip(|| "Failed to write EOF in redis store get_part")?;
Ok(())
}

fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
self
}

fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
registry.register_collector(Box::new(Collector::new(&self)));
}

fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
registry.register_indicator(self);
}
}

impl<T: RedisConnectionTrait + 'static> MetricsComponent for RedisStore<T> {
fn gather_metrics(&self, _c: &mut CollectorState) {}
}

#[async_trait]
impl<T: RedisConnectionTrait + ConnectionLike + 'static> HealthStatusIndicator for RedisStore<T> {
fn get_name(&self) -> &'static str {
"RedisStore"
}

async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
Store::check_health(Pin::new(self), namespace).await
}
}
Loading

0 comments on commit b79be6c

Please sign in to comment.