From 3e5b091f51fb35f9828e2fd0343890e36cc81292 Mon Sep 17 00:00:00 2001
From: Blake Hatch <48665344+blakehatch@users.noreply.github.com>
Date: Wed, 1 Nov 2023 12:33:18 +0000
Subject: [PATCH] Add Redis Store

Signed-off-by: Blake Hatch <blakewihatch@gmail.com>
---
 Cargo.lock                                    | 101 ++++-
 Cargo.toml                                    |   3 +
 nativelink-config/src/stores.rs               |  31 +-
 nativelink-store/BUILD.bazel                  |   8 +
 nativelink-store/Cargo.toml                   |   3 +
 nativelink-store/src/cas_utils.rs             |   2 +-
 nativelink-store/src/default_store_factory.rs |   2 +
 nativelink-store/src/lib.rs                   |   1 +
 nativelink-store/src/redis_store.rs           | 358 ++++++++++++++++++
 nativelink-store/tests/redis_store_test.rs    | 299 +++++++++++++++
 nativelink-util/src/buf_channel.rs            |   5 +
 11 files changed, 810 insertions(+), 3 deletions(-)
 create mode 100644 nativelink-store/src/redis_store.rs
 create mode 100644 nativelink-store/tests/redis_store_test.rs

diff --git a/Cargo.lock b/Cargo.lock
index 74582e0b3..acb28c138 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -99,6 +99,18 @@ version = "1.0.80"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1"
 
+[[package]]
+name = "arc-cell"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fec9da9adf9420d86def101bd5b4a227b0512d456b6a128b0d677fdf68e5f7b8"
+
+[[package]]
+name = "arc-swap"
+version = "1.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
+
 [[package]]
 name = "arrayref"
 version = "0.3.7"
@@ -792,6 +804,20 @@ version = "1.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
 
+[[package]]
+name = "combine"
+version = "4.6.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd"
+dependencies = [
+ "bytes",
+ "futures-core",
+ "memchr",
+ "pin-project-lite",
+ "tokio",
+ "tokio-util",
+]
+
 [[package]]
 name = "concurrent-queue"
 version = "2.4.0"
@@ -1481,7 +1507,7 @@ dependencies = [
  "hyper",
  "log",
  "rustls 0.21.10",
- "rustls-native-certs",
+ "rustls-native-certs 0.6.3",
  "tokio",
  "tokio-rustls 0.24.1",
  "webpki-roots",
@@ -1733,6 +1759,7 @@ version = "0.3.0"
 dependencies = [
  "async-lock",
  "axum",
+ "cc",
  "clap",
  "console-subscriber",
  "futures",
@@ -1850,6 +1877,7 @@ dependencies = [
 name = "nativelink-store"
 version = "0.3.0"
 dependencies = [
+ "arc-cell",
  "async-lock",
  "async-trait",
  "aws-config",
@@ -1879,6 +1907,8 @@ dependencies = [
  "pretty_assertions",
  "prost",
  "rand",
+ "redis",
+ "redis-test",
  "serde",
  "sha2",
  "shellexpand",
@@ -2375,6 +2405,45 @@ dependencies = [
  "getrandom",
 ]
 
+[[package]]
+name = "redis"
+version = "0.25.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6472825949c09872e8f2c50bde59fcefc17748b6be5c90fd67cd8b4daca73bfd"
+dependencies = [
+ "arc-swap",
+ "async-trait",
+ "bytes",
+ "combine",
+ "futures",
+ "futures-util",
+ "itoa",
+ "percent-encoding",
+ "pin-project-lite",
+ "rustls 0.22.2",
+ "rustls-native-certs 0.7.0",
+ "rustls-pemfile 2.1.2",
+ "rustls-pki-types",
+ "ryu",
+ "sha1_smol",
+ "socket2",
+ "tokio",
+ "tokio-retry",
+ "tokio-rustls 0.25.0",
+ "tokio-util",
+ "url",
+]
+
+[[package]]
+name = "redis-test"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9a948b3cec9e4b1fedbb0f0788e79029fb1f641b6cfefb7a15d044f803854427"
+dependencies = [
+ "futures",
+ "redis",
+]
+
 [[package]]
 name = "redox_syscall"
 version = "0.4.1"
@@ -2552,6 +2621,19 @@ dependencies = [
  "security-framework",
 ]
 
+[[package]]
+name = "rustls-native-certs"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792"
+dependencies = [
+ "openssl-probe",
+ "rustls-pemfile 2.1.2",
+ "rustls-pki-types",
+ "schannel",
+ "security-framework",
+]
+
 [[package]]
 name = "rustls-pemfile"
 version = "1.0.4"
@@ -2754,6 +2836,12 @@ dependencies = [
  "digest",
 ]
 
+[[package]]
+name = "sha1_smol"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012"
+
 [[package]]
 name = "sha2"
 version = "0.10.8"
@@ -3023,6 +3111,17 @@ dependencies = [
  "syn 2.0.52",
 ]
 
+[[package]]
+name = "tokio-retry"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f"
+dependencies = [
+ "pin-project",
+ "rand",
+ "tokio",
+]
+
 [[package]]
 name = "tokio-rustls"
 version = "0.24.1"
diff --git a/Cargo.toml b/Cargo.toml
index b089b530b..61bf0e8e1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -37,6 +37,9 @@ nativelink-worker = { path = "nativelink-worker" }
 
 async-lock = "3.3.0"
 axum = "0.6.20"
+
+# The cc crate must be pinned to be compatible with the zig-cc toolchain.
+cc = "1.0.83"
 clap = { version = "4.5.4", features = ["derive"] }
 console-subscriber = { version = "0.2.0" }
 futures = "0.3.30"
diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs
index cf57ebd0c..68d3f7553 100644
--- a/nativelink-config/src/stores.rs
+++ b/nativelink-config/src/stores.rs
@@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize};
 
 use crate::serde_utils::{
     convert_numeric_with_shellexpand, convert_optional_string_with_shellexpand,
-    convert_string_with_shellexpand,
+    convert_string_with_shellexpand, convert_vec_string_with_shellexpand,
 };
 
 /// Name of the store. This type will be used when referencing a store
@@ -163,6 +163,13 @@ pub enum StoreConfig {
     /// side effects and is the most efficient way to use it.
     grpc(GrpcStore),
 
+    /// Stores data in any stores compatible with Redis APIs.
+    ///
+    /// Pairs well with SizePartitioning and/or FastSlow stores.
+    /// Ideal for accepting small object sizes as most redis store
+    /// services have a max file upload of between 256Mb-512Mb.
+    redis_store(RedisStore),
+
     /// Noop store is a store that sends streams into the void and all data
     /// retrieval will return 404 (NotFound). This can be useful for cases
     /// where you may need to partition your data and part of your data needs
@@ -597,6 +604,28 @@ pub enum ErrorCode {
     // Note: This list is duplicated from nativelink-error/lib.rs.
 }
 
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct RedisStore {
+    /// The hostname or IP address of the Redis server.
+    /// Ex: ["redis://username:password@redis-server-url:6380/99"]
+    /// 99 Represents database ID, 6380 represents the port.
+    // Note: This is currently one address but supports multile for clusters.
+    #[serde(deserialize_with = "convert_vec_string_with_shellexpand")]
+    pub addresses: Vec<String>,
+
+    /// The response timeout for the Redis connection in seconds.
+    ///
+    /// Default: 10
+    #[serde(default)]
+    pub response_timeout_s: u64,
+
+    /// The connection timeout for the Redis connection in seconds.
+    ///
+    /// Default: 10
+    #[serde(default)]
+    pub connection_timeout_s: u64,
+}
+
 /// Retry configuration. This configuration is exponential and each iteration
 /// a jitter as a percentage is applied of the calculated delay. For example:
 /// ```rust,ignore
diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel
index 21ac9c159..3d6099693 100644
--- a/nativelink-store/BUILD.bazel
+++ b/nativelink-store/BUILD.bazel
@@ -22,6 +22,7 @@ rust_library(
         "src/lib.rs",
         "src/memory_store.rs",
         "src/noop_store.rs",
+        "src/redis_store.rs",
         "src/ref_store.rs",
         "src/s3_store.rs",
         "src/shard_store.rs",
@@ -38,6 +39,7 @@ rust_library(
         "//nativelink-error",
         "//nativelink-proto",
         "//nativelink-util",
+        "@crates//:arc-cell",
         "@crates//:async-lock",
         "@crates//:aws-config",
         "@crates//:aws-sdk-s3",
@@ -56,6 +58,7 @@ rust_library(
         "@crates//:parking_lot",
         "@crates//:prost",
         "@crates//:rand",
+        "@crates//:redis",
         "@crates//:serde",
         "@crates//:sha2",
         "@crates//:shellexpand",
@@ -81,6 +84,7 @@ rust_test_suite(
         "tests/fast_slow_store_test.rs",
         "tests/filesystem_store_test.rs",
         "tests/memory_store_test.rs",
+        "tests/redis_store_test.rs",
         "tests/ref_store_test.rs",
         "tests/s3_store_test.rs",
         "tests/shard_store_test.rs",
@@ -105,6 +109,7 @@ rust_test_suite(
         "@crates//:bytes",
         "@crates//:filetime",
         "@crates//:futures",
+        "@crates//:hex",
         "@crates//:http",
         "@crates//:http-body",
         "@crates//:hyper",
@@ -112,9 +117,12 @@ rust_test_suite(
         "@crates//:once_cell",
         "@crates//:pretty_assertions",
         "@crates//:rand",
+        "@crates//:redis",
+        "@crates//:redis-test",
         "@crates//:sha2",
         "@crates//:tokio",
         "@crates//:tokio-stream",
+        "@crates//:uuid",
     ],
 )
 
diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml
index bc8da14b2..3e05db17a 100644
--- a/nativelink-store/Cargo.toml
+++ b/nativelink-store/Cargo.toml
@@ -9,6 +9,7 @@ nativelink-config = { path = "../nativelink-config" }
 nativelink-util = { path = "../nativelink-util" }
 nativelink-proto = { path = "../nativelink-proto" }
 
+arc-cell = "0.3.3"
 async-lock = "3.3.0"
 async-trait = "0.1.80"
 aws-config = "1.1.9"
@@ -28,6 +29,7 @@ lz4_flex = "0.11.3"
 parking_lot = "0.12.1"
 prost = "0.12.4"
 rand = "0.8.5"
+redis = { version = "0.25.2", features = ["tokio-comp", "tokio-rustls-comp", "connection-manager"] }
 serde = "1.0.197"
 sha2 = "0.10.8"
 shellexpand = "3.1.0"
@@ -40,6 +42,7 @@ tracing = "0.1.40"
 uuid = { version = "1.8.0", features = ["v4"] }
 
 [dev-dependencies]
+redis-test = { version = "0.4.0", features = ["aio"] }
 pretty_assertions = "1.4.0"
 memory-stats = "1.1.0"
 once_cell = "1.19.0"
diff --git a/nativelink-store/src/cas_utils.rs b/nativelink-store/src/cas_utils.rs
index 77e9876e6..6ddf67330 100644
--- a/nativelink-store/src/cas_utils.rs
+++ b/nativelink-store/src/cas_utils.rs
@@ -14,7 +14,7 @@
 
 use nativelink_util::common::DigestInfo;
 
-const ZERO_BYTE_DIGESTS: [DigestInfo; 2] = [
+pub const ZERO_BYTE_DIGESTS: [DigestInfo; 2] = [
     // Sha256 hash of zero bytes.
     DigestInfo::new(
         [
diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs
index cf64390fa..32a53031b 100644
--- a/nativelink-store/src/default_store_factory.rs
+++ b/nativelink-store/src/default_store_factory.rs
@@ -32,6 +32,7 @@ use crate::filesystem_store::FilesystemStore;
 use crate::grpc_store::GrpcStore;
 use crate::memory_store::MemoryStore;
 use crate::noop_store::NoopStore;
+use crate::redis_store::RedisStore;
 use crate::ref_store::RefStore;
 use crate::s3_store::S3Store;
 use crate::shard_store::ShardStore;
@@ -51,6 +52,7 @@ pub fn store_factory<'a>(
         let store: Arc<dyn Store> = match backend {
             StoreConfig::memory(config) => Arc::new(MemoryStore::new(config)),
             StoreConfig::experimental_s3_store(config) => Arc::new(S3Store::new(config).await?),
+            StoreConfig::redis_store(config) => Arc::new(RedisStore::new(config)?),
             StoreConfig::verify(config) => Arc::new(VerifyStore::new(
                 config,
                 store_factory(&config.backend, store_manager, None, None).await?,
diff --git a/nativelink-store/src/lib.rs b/nativelink-store/src/lib.rs
index 843fb3799..c78da43d7 100644
--- a/nativelink-store/src/lib.rs
+++ b/nativelink-store/src/lib.rs
@@ -24,6 +24,7 @@ pub mod filesystem_store;
 pub mod grpc_store;
 pub mod memory_store;
 pub mod noop_store;
+pub mod redis_store;
 pub mod ref_store;
 pub mod s3_store;
 pub mod shard_store;
diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs
new file mode 100644
index 000000000..21fc52938
--- /dev/null
+++ b/nativelink-store/src/redis_store.rs
@@ -0,0 +1,358 @@
+// Copyright 2024 The NativeLink Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::borrow::Cow;
+use std::cell::OnceCell;
+use std::pin::Pin;
+use std::sync::Arc;
+
+use arc_cell::ArcCell;
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::future::{BoxFuture, FutureExt, Shared};
+use nativelink_error::{error_if, make_err, Code, Error, ResultExt};
+use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
+use nativelink_util::common::DigestInfo;
+use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
+use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry};
+use nativelink_util::store_trait::{Store, UploadSizeInfo};
+use redis::aio::{ConnectionLike, ConnectionManager};
+use redis::AsyncCommands;
+use tracing::{error_span, Instrument};
+
+use crate::cas_utils::is_zero_digest;
+
+const READ_CHUNK_SIZE: isize = 64 * 1024;
+
+fn digest_to_key(digest: &DigestInfo) -> String {
+    format!("{}-{}", digest.hash_str(), digest.size_bytes)
+}
+
+/// Holds a connection result or a future that resolves to a connection.
+/// This is a utility to allow us to start a connection but not block on it.
+pub enum LazyConnection<T: ConnectionLike + Unpin + Clone + Send + Sync> {
+    Connection(Result<T, Error>),
+    Future(Shared<BoxFuture<'static, Result<T, Error>>>),
+}
+
+pub struct RedisStore<T: ConnectionLike + Unpin + Clone + Send + Sync = ConnectionManager> {
+    lazy_conn: ArcCell<LazyConnection<T>>,
+    temp_name_generator_fn: fn() -> String,
+}
+
+impl RedisStore {
+    pub fn new(
+        config: &nativelink_config::stores::RedisStore,
+    ) -> Result<RedisStore<ConnectionManager>, Error> {
+        // Note: Currently only one connection is supported.
+        error_if!(
+            config.addresses.len() != 1,
+            "Only one address is supported for Redis store"
+        );
+
+        let address = config.addresses[0].clone();
+        let conn_fut = async move {
+            redis::Client::open(address)
+                .map_err(from_redis_err)?
+                .get_connection_manager()
+                .await
+                .map_err(from_redis_err)
+        }
+        .boxed()
+        .shared();
+
+        let conn_fut_clone = conn_fut.clone();
+        // Start connecting to redis, but don't block our construction on it.
+        tokio::spawn(
+            async move {
+                if let Err(e) = conn_fut_clone.await {
+                    make_err!(Code::Unavailable, "Failed to connect to Redis: {:?}", e);
+                }
+            }
+            .instrument(error_span!("redis_initial_connection")),
+        );
+
+        let lazy_conn = LazyConnection::Future(conn_fut);
+
+        Ok(RedisStore::new_with_conn_and_name_generator(
+            lazy_conn,
+            || uuid::Uuid::new_v4().to_string(),
+        ))
+    }
+}
+
+impl<T: ConnectionLike + Unpin + Clone + Send + Sync> RedisStore<T> {
+    pub fn new_with_conn_and_name_generator(
+        lazy_conn: LazyConnection<T>,
+        temp_name_generator_fn: fn() -> String,
+    ) -> RedisStore<T> {
+        RedisStore {
+            lazy_conn: ArcCell::new(Arc::new(lazy_conn)),
+            temp_name_generator_fn,
+        }
+    }
+
+    async fn get_conn(&self) -> Result<T, Error> {
+        let result = match self.lazy_conn.get().as_ref() {
+            LazyConnection::Connection(conn_result) => return conn_result.clone(),
+            LazyConnection::Future(fut) => fut.clone().await,
+        };
+        self.lazy_conn
+            .set(Arc::new(LazyConnection::Connection(result.clone())));
+        result
+    }
+}
+
+#[async_trait]
+impl<T: ConnectionLike + Unpin + Clone + Send + Sync + 'static> Store for RedisStore<T> {
+    async fn has_with_results(
+        self: Pin<&Self>,
+        digests: &[DigestInfo],
+        results: &mut [Option<usize>],
+    ) -> Result<(), Error> {
+        if digests.len() == 1 && is_zero_digest(&digests[0]) {
+            results[0] = Some(0);
+            return Ok(());
+        }
+        let mut conn = self.get_conn().await?;
+
+        let mut pipe = redis::pipe();
+        pipe.atomic();
+
+        let mut zero_digest_indexes = Vec::new();
+        digests.iter().enumerate().for_each(|(index, digest)| {
+            if is_zero_digest(digest) {
+                zero_digest_indexes.push(index);
+            }
+
+            pipe.strlen(digest_to_key(digest));
+        });
+
+        let digest_sizes = pipe
+            .query_async::<_, Vec<usize>>(&mut conn)
+            .await
+            .map_err(from_redis_err)
+            .err_tip(|| "Error: Could not call pipeline in has_with_results")?;
+
+        error_if!(
+            digest_sizes.len() != results.len(),
+            "Mismatch in digest sizes and results length"
+        );
+
+        digest_sizes
+            .into_iter()
+            .zip(results.iter_mut())
+            .for_each(|(size, result)| {
+                *result = if size == 0 { None } else { Some(size) };
+            });
+
+        zero_digest_indexes.into_iter().for_each(|index| {
+            results[index] = Some(0);
+        });
+
+        Ok(())
+    }
+
+    async fn update(
+        self: Pin<&Self>,
+        digest: DigestInfo,
+        mut reader: DropCloserReadHalf,
+        _upload_size: UploadSizeInfo,
+    ) -> Result<(), Error> {
+        let temp_key = OnceCell::new();
+        let make_temp_name = || format!("temp-{}", (self.temp_name_generator_fn)());
+        let mut conn = self.get_conn().await?;
+        let mut pipe = redis::pipe();
+        pipe.atomic();
+
+        'outer: loop {
+            let mut force_recv = true;
+
+            while force_recv || !reader.is_empty() {
+                let chunk = reader
+                    .recv()
+                    .await
+                    .err_tip(|| "Failed to reach chunk in update in redis store")?;
+
+                if chunk.is_empty() {
+                    if is_zero_digest(&digest) {
+                        return Ok(());
+                    }
+                    if force_recv {
+                        conn.append(digest_to_key(&digest), &chunk[..])
+                            .await
+                            .map_err(from_redis_err)
+                            .err_tip(|| "In RedisStore::update() single chunk")?;
+                    }
+                    break 'outer;
+                }
+
+                pipe.cmd("APPEND")
+                    .arg(temp_key.get_or_init(make_temp_name))
+                    .arg(&chunk[..]);
+                force_recv = false;
+
+                // Give other tasks a chance to run to populate the reader's
+                // buffer if possible.
+                tokio::task::yield_now().await;
+            }
+
+            pipe.query_async(&mut conn)
+                .await
+                .map_err(from_redis_err)
+                .err_tip(|| "In RedisStore::update::query_async")?;
+            pipe.clear();
+        }
+
+        pipe.cmd("RENAME")
+            .arg(temp_key.get_or_init(make_temp_name))
+            .arg(digest_to_key(&digest));
+        pipe.query_async(&mut conn)
+            .await
+            .map_err(from_redis_err)
+            .err_tip(|| "In RedisStore::update")?;
+
+        Ok(())
+    }
+
+    async fn get_part_ref(
+        self: Pin<&Self>,
+        digest: DigestInfo,
+        writer: &mut DropCloserWriteHalf,
+        offset: usize,
+        length: Option<usize>,
+    ) -> Result<(), Error> {
+        // To follow RBE spec we need to consider any digest's with
+        // zero size to be existing.
+        if is_zero_digest(&digest) {
+            writer
+                .send_eof()
+                .err_tip(|| "Failed to send zero EOF in redis store get_part_ref")?;
+            return Ok(());
+        }
+
+        let mut conn = self.get_conn().await?;
+        if length == Some(0) {
+            let exists = conn
+                .exists::<_, bool>(digest_to_key(&digest))
+                .await
+                .map_err(from_redis_err)
+                .err_tip(|| "In RedisStore::get_part_ref::zero_exists")?;
+            if !exists {
+                return Err(make_err!(
+                    Code::NotFound,
+                    "Data not found in Redis store for digest: {}",
+                    digest_to_key(&digest)
+                ));
+            }
+            writer
+                .send_eof()
+                .err_tip(|| "Failed to write EOF in redis store get_part_ref")?;
+            return Ok(());
+        }
+
+        let mut current_start = isize::try_from(offset)
+            .err_tip(|| "Cannot convert offset to isize in RedisStore::get_part_ref()")?;
+        let max_length = isize::try_from(length.unwrap_or(isize::MAX as usize))
+            .err_tip(|| "Cannot convert length to isize in RedisStore::get_part_ref()")?;
+        let end_position = current_start.saturating_add(max_length);
+
+        loop {
+            // Note: Redis getrange is inclusive, so we need to subtract 1 from the end.
+            let current_end =
+                std::cmp::min(current_start.saturating_add(READ_CHUNK_SIZE), end_position) - 1;
+            let chunk = conn
+                .getrange::<_, Bytes>(digest_to_key(&digest), current_start, current_end)
+                .await
+                .map_err(from_redis_err)
+                .err_tip(|| "In RedisStore::get_part_ref::getrange")?;
+
+            if chunk.is_empty() {
+                writer
+                    .send_eof()
+                    .err_tip(|| "Failed to write EOF in redis store get_part")?;
+                break;
+            }
+
+            // Note: Redis getrange is inclusive, so we need to add 1 to the end.
+            let was_partial_data = chunk.len() as isize != current_end - current_start + 1;
+            current_start += chunk.len() as isize;
+            writer
+                .send(chunk)
+                .await
+                .err_tip(|| "Failed to write data in Redis store")?;
+
+            // If we got partial data or the exact requested number of bytes, we are done.
+            if writer.get_bytes_written() as isize == max_length || was_partial_data {
+                writer
+                    .send_eof()
+                    .err_tip(|| "Failed to write EOF in redis store get_part")?;
+
+                break;
+            }
+
+            error_if!(
+                writer.get_bytes_written() as isize > max_length,
+                "Data received exceeds requested length"
+            );
+        }
+
+        Ok(())
+    }
+
+    fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
+        self
+    }
+
+    fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
+        self
+    }
+
+    fn as_any(&self) -> &(dyn std::any::Any + Sync + Send) {
+        self
+    }
+
+    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send> {
+        self
+    }
+
+    fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
+        registry.register_collector(Box::new(Collector::new(&self)));
+    }
+
+    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
+        registry.register_indicator(self);
+    }
+}
+
+impl<T: ConnectionLike + Unpin + Clone + Send + Sync + 'static> MetricsComponent for RedisStore<T> {
+    fn gather_metrics(&self, _c: &mut CollectorState) {}
+}
+
+#[async_trait]
+impl<T: ConnectionLike + ConnectionLike + Unpin + Clone + Send + Sync + 'static>
+    HealthStatusIndicator for RedisStore<T>
+{
+    fn get_name(&self) -> &'static str {
+        "RedisStore"
+    }
+
+    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
+        Store::check_health(Pin::new(self), namespace).await
+    }
+}
+
+fn from_redis_err(call_res: redis::RedisError) -> Error {
+    make_err!(Code::Internal, "Redis Error: {call_res}")
+}
diff --git a/nativelink-store/tests/redis_store_test.rs b/nativelink-store/tests/redis_store_test.rs
new file mode 100644
index 000000000..53b3b30e3
--- /dev/null
+++ b/nativelink-store/tests/redis_store_test.rs
@@ -0,0 +1,299 @@
+// Copyright 2024 The NativeLink Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::pin::Pin;
+
+use bytes::Bytes;
+use nativelink_error::Error;
+use nativelink_store::cas_utils::ZERO_BYTE_DIGESTS;
+use nativelink_store::redis_store::{LazyConnection, RedisStore};
+use nativelink_util::common::DigestInfo;
+use nativelink_util::store_trait::Store;
+use redis::{Pipeline, RedisError};
+use redis_test::{MockCmd, MockRedisConnection};
+
+const VALID_HASH1: &str = "3031323334353637383961626364656630303030303030303030303030303030";
+const TEMP_UUID: &str = "temp-550e8400-e29b-41d4-a716-446655440000";
+
+type Command = str;
+type Arg = str;
+type RedisResult<'a> = Result<&'a [redis::Value], RedisError>;
+
+fn mock_uuid_generator() -> String {
+    uuid::Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000")
+        .unwrap()
+        .to_string()
+}
+
+#[cfg(test)]
+mod redis_store_tests {
+    use nativelink_util::buf_channel::make_buf_channel_pair;
+    use nativelink_util::store_trait::UploadSizeInfo;
+
+    use super::*;
+
+    struct MockRedisConnectionBuilder {
+        mock_cmds: Vec<MockCmd>,
+    }
+
+    impl MockRedisConnectionBuilder {
+        fn new() -> Self {
+            MockRedisConnectionBuilder { mock_cmds: vec![] }
+        }
+
+        fn pipe(mut self, inputs: &[(&Command, &[&Arg], RedisResult)]) -> Self {
+            let mut pipe = Pipeline::new();
+            pipe.atomic();
+            let mut res_vec = vec![];
+            for (cmd, args, result) in inputs {
+                let mut command = redis::cmd(cmd);
+                for arg in args.iter() {
+                    command.arg(arg);
+                }
+                for res in result.as_ref().unwrap().iter() {
+                    res_vec.push(res.clone());
+                }
+                pipe.add_command(command);
+            }
+            self.mock_cmds.push(MockCmd::with_values(pipe, Ok(res_vec)));
+            self
+        }
+
+        fn cmd(mut self, cmd: &Command, args: &[&Arg], result: Result<&str, RedisError>) -> Self {
+            let mut cmd = redis::cmd(cmd);
+            for arg in args {
+                cmd.arg(arg);
+            }
+            self.mock_cmds.push(MockCmd::new(cmd, result));
+            self
+        }
+
+        fn build(self) -> MockRedisConnection {
+            MockRedisConnection::new(self.mock_cmds)
+        }
+    }
+
+    #[tokio::test]
+    async fn upload_and_get_data() -> Result<(), Error> {
+        let data = Bytes::from_static(b"14");
+
+        let digest = DigestInfo::try_new(VALID_HASH1, 2)?;
+        let packed_hash_hex = format!("{}-{}", digest.hash_str(), digest.size_bytes);
+
+        let chunk_data = "14";
+
+        let redis_connection = MockRedisConnectionBuilder::new()
+            .pipe(&[("APPEND", &[TEMP_UUID, chunk_data], Ok(&[redis::Value::Nil]))])
+            .cmd("APPEND", &[&packed_hash_hex, ""], Ok(""))
+            .pipe(&[(
+                "RENAME",
+                &[TEMP_UUID, &packed_hash_hex],
+                Ok(&[redis::Value::Nil]),
+            )])
+            .pipe(&[(
+                "STRLEN",
+                &[&packed_hash_hex],
+                Ok(&[redis::Value::Bulk(vec![redis::Value::Int(2)])]),
+            )])
+            .cmd("GETRANGE", &[&packed_hash_hex, "0", "1"], Ok("14"))
+            .build();
+
+        let store = RedisStore::new_with_conn_and_name_generator(
+            LazyConnection::Connection(Ok(redis_connection)),
+            mock_uuid_generator,
+        );
+        let pinned_store: Pin<&RedisStore<MockRedisConnection>> = Pin::new(&store);
+
+        pinned_store.update_oneshot(digest, data.clone()).await?;
+
+        let result = pinned_store.has(digest).await?;
+        assert!(
+            result.is_some(),
+            "Expected redis store to have hash: {VALID_HASH1}",
+        );
+
+        let result = pinned_store
+            .get_part_unchunked(digest, 0, Some(data.clone().len()))
+            .await?;
+
+        assert_eq!(result, data, "Expected redis store to have updated value",);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn upload_empty_data() -> Result<(), Error> {
+        let data = Bytes::from_static(b"");
+
+        let digest = ZERO_BYTE_DIGESTS[0];
+
+        let redis_connection = MockRedisConnectionBuilder::new().build();
+
+        let store = RedisStore::new_with_conn_and_name_generator(
+            LazyConnection::Connection(Ok(redis_connection)),
+            mock_uuid_generator,
+        );
+        let pinned_store: Pin<&RedisStore<MockRedisConnection>> = Pin::new(&store);
+
+        pinned_store.update_oneshot(digest, data).await?;
+
+        let result = pinned_store.has(digest).await?;
+        assert!(
+            result.is_some(),
+            "Expected redis store to have hash: {VALID_HASH1}",
+        );
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_uploading_large_data() -> Result<(), Error> {
+        // Requires multiple chunks as data is larger than 64K
+        let data: Bytes = Bytes::from(vec![0u8; 65 * 1024]);
+
+        let digest = DigestInfo::try_new(VALID_HASH1, 1)?;
+        let packed_hash_hex = format!("{}-{}", digest.hash_str(), digest.size_bytes);
+
+        let chunk_data = std::str::from_utf8(&data).unwrap().to_string();
+
+        let redis_connection = MockRedisConnectionBuilder::new()
+            .pipe(&[(
+                "APPEND",
+                &[TEMP_UUID, &chunk_data],
+                Ok(&[redis::Value::Nil]),
+            )])
+            .cmd(
+                "APPEND",
+                &[&packed_hash_hex, ""],
+                Ok(&hex::encode(&data[..])),
+            )
+            .pipe(&[(
+                "RENAME",
+                &[TEMP_UUID, &packed_hash_hex],
+                Ok(&[redis::Value::Nil]),
+            )])
+            .pipe(&[(
+                "STRLEN",
+                &[&packed_hash_hex],
+                Ok(&[redis::Value::Bulk(vec![redis::Value::Int(2)])]),
+            )])
+            .cmd(
+                "GETRANGE",
+                &[&packed_hash_hex, "0", "65535"],
+                Ok(&hex::encode(&data[..])),
+            )
+            .cmd(
+                "GETRANGE",
+                &[&packed_hash_hex, "65535", "65560"],
+                Ok(&hex::encode(&data[..])),
+            )
+            .build();
+
+        let store = RedisStore::new_with_conn_and_name_generator(
+            LazyConnection::Connection(Ok(redis_connection)),
+            mock_uuid_generator,
+        );
+        let pinned_store: Pin<&RedisStore<MockRedisConnection>> = Pin::new(&store);
+
+        pinned_store.update_oneshot(digest, data.clone()).await?;
+
+        let result = pinned_store.has(digest).await?;
+        assert!(
+            result.is_some(),
+            "Expected redis store to have hash: {VALID_HASH1}",
+        );
+
+        let get_result: Bytes = pinned_store
+            .get_part_unchunked(digest, 0, Some(data.clone().len()))
+            .await?;
+
+        assert_eq!(
+            hex::encode(get_result).len(),
+            hex::encode(data.clone()).len(),
+            "Expected redis store to have updated value",
+        );
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn yield_between_sending_packets_in_update() -> Result<(), Error> {
+        let data = Bytes::from(vec![0u8; 10 * 1024]);
+        let data_p1 = Bytes::from(vec![0u8; 6 * 1024]);
+        let data_p2 = Bytes::from(vec![0u8; 4 * 1024]);
+
+        let digest = DigestInfo::try_new(VALID_HASH1, 2)?;
+        let packed_hash_hex = format!("{}-{}", digest.hash_str(), digest.size_bytes);
+
+        let redis_connection = MockRedisConnectionBuilder::new()
+            .pipe(&[
+                (
+                    "APPEND",
+                    &[TEMP_UUID, std::str::from_utf8(&data_p1).unwrap()],
+                    Ok(&[redis::Value::Nil]),
+                ),
+                (
+                    "APPEND",
+                    &[TEMP_UUID, std::str::from_utf8(&data_p2).unwrap()],
+                    Ok(&[redis::Value::Nil]),
+                ),
+            ])
+            .cmd("APPEND", &[&packed_hash_hex, ""], Ok(""))
+            .pipe(&[(
+                "RENAME",
+                &[TEMP_UUID, &packed_hash_hex],
+                Ok(&[redis::Value::Nil]),
+            )])
+            .pipe(&[(
+                "STRLEN",
+                &[&packed_hash_hex],
+                Ok(&[redis::Value::Bulk(vec![redis::Value::Int(2)])]),
+            )])
+            .cmd(
+                "GETRANGE",
+                &[&packed_hash_hex, "0", "10239"],
+                Ok(std::str::from_utf8(&data).unwrap()),
+            )
+            .build();
+
+        let store = RedisStore::new_with_conn_and_name_generator(
+            LazyConnection::Connection(Ok(redis_connection)),
+            mock_uuid_generator,
+        );
+        let pinned_store: Pin<&RedisStore<MockRedisConnection>> = Pin::new(&store);
+
+        let (mut tx, rx) = make_buf_channel_pair();
+        tx.send(data_p1).await?;
+        tokio::task::yield_now().await;
+        tx.send(data_p2).await?;
+        tx.send_eof()?;
+        pinned_store
+            .update(digest, rx, UploadSizeInfo::ExactSize(data.len()))
+            .await?;
+
+        let result = pinned_store.has(digest).await?;
+        assert!(
+            result.is_some(),
+            "Expected redis store to have hash: {VALID_HASH1}",
+        );
+
+        let result = pinned_store
+            .get_part_unchunked(digest, 0, Some(data.clone().len()))
+            .await?;
+
+        assert_eq!(result, data, "Expected redis store to have updated value",);
+
+        Ok(())
+    }
+}
diff --git a/nativelink-util/src/buf_channel.rs b/nativelink-util/src/buf_channel.rs
index d113f8f25..0880a5588 100644
--- a/nativelink-util/src/buf_channel.rs
+++ b/nativelink-util/src/buf_channel.rs
@@ -185,6 +185,11 @@ pub struct DropCloserReadHalf {
 }
 
 impl DropCloserReadHalf {
+    /// Returns if the stream has data ready.
+    pub fn is_empty(&self) -> bool {
+        self.rx.is_empty()
+    }
+
     /// Receive a chunk of data.
     pub async fn recv(&mut self) -> Result<Bytes, Error> {
         let maybe_chunk = match self.queued_data.pop_front() {