Skip to content

Commit

Permalink
Added redis store integration
Browse files Browse the repository at this point in the history
  • Loading branch information
blakehatch committed Nov 14, 2023
1 parent 22abf90 commit c374aaf
Show file tree
Hide file tree
Showing 16 changed files with 2,994 additions and 486 deletions.
1,289 changes: 959 additions & 330 deletions Cargo.Bazel.lock

Large diffs are not rendered by default.

1,709 changes: 1,555 additions & 154 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ members = [
"gencargo/property_modifier_scheduler",
"gencargo/property_modifier_scheduler_test",
"gencargo/proto",
"gencargo/redis_store",
"gencargo/redis_store_test",
"gencargo/ref_store",
"gencargo/ref_store_test",
"gencargo/resource_info",
Expand Down Expand Up @@ -133,6 +135,8 @@ prost-build = "0.11.9"
prost-types = "0.11.9"
rand = "0.8.5"
rcgen = "0.11.3"
redis = { version = "0.23.3", features = ["tokio-comp", "tokio-rustls-comp"] }
redis-test = { version = "0.2.3", features = ["aio"] }
relative-path = "1.8.0"
rusoto_core = "0.48.0"
rusoto_mock = "=0.48.0"
Expand Down Expand Up @@ -210,6 +214,8 @@ platform_property_manager = { path = "gencargo/platform_property_manager" }
property_modifier_scheduler = { path = "gencargo/property_modifier_scheduler" }
property_modifier_scheduler_test = { path = "gencargo/property_modifier_scheduler_test" }
proto = { path = "gencargo/proto" }
redis_store = { path = "gencargo/redis_store" }
redis_store_test = { path = "gencargo/redis_store_test" }
ref_store = { path = "gencargo/ref_store" }
ref_store_test = { path = "gencargo/ref_store_test" }
resource_info = { path = "gencargo/resource_info" }
Expand Down
48 changes: 47 additions & 1 deletion cas/store/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ rust_library(
":ref_store",
":s3_store",
":shard_store",
":redis_store",
":size_partitioning_store",
":store",
":traits",
Expand All @@ -39,6 +40,7 @@ rust_library(
"//util:error",
"//util:metrics_utils",
"@crate_index//:futures",
"@crate_index//:redis"
],
)

Expand Down Expand Up @@ -307,6 +309,29 @@ rust_library(
],
)

rust_library(
name = "redis_store",
srcs = ["redis_store.rs"],
proc_macro_deps = ["@crate_index//:async-trait"],
visibility = ["//cas:__pkg__"],
deps = [
":traits",
"//config",
"//util:buf_channel",
"//util:common",
"//util:error",
"@crate_index//:futures",
"@crate_index//:tokio",
"@crate_index//:sha2",
"@crate_index//:bytes",
"@crate_index//:byteorder",
"@crate_index//:hex",
"@crate_index//:redis",
"@crate_index//:redis-test",
"@crate_index//:pretty_assertions",
]
)

rust_test(
name = "size_partitioning_store_test",
srcs = ["tests/size_partitioning_store_test.rs"],
Expand All @@ -322,6 +347,27 @@ rust_test(
],
)

rust_test(
name = "redis_store_test",
srcs = ["tests/redis_store_test.rs"],
deps = [
":traits",
":redis_store",
":s3_store",
"//config",
"//util:buf_channel",
"//util:async_fixed_buffer",
"//util:common",
"//util:error",
"@crate_index//:bytes",
"@crate_index//:hex",
"@crate_index//:redis",
"@crate_index//:tokio",
"@crate_index//:redis-test",
"@crate_index//:pretty_assertions",
],
)

rust_test(
name = "ref_store_test",
srcs = ["tests/ref_store_test.rs"],
Expand Down Expand Up @@ -498,4 +544,4 @@ rust_test(
"@crate_index//:rand",
"@crate_index//:tokio",
],
)
)
5 changes: 5 additions & 0 deletions cas/store/default_store_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use filesystem_store::FilesystemStore;
use grpc_store::GrpcStore;
use memory_store::MemoryStore;
use metrics_utils::Registry;
use redis_store::RedisStore;
use ref_store::RefStore;
use s3_store::S3Store;
use shard_store::ShardStore;
Expand All @@ -45,6 +46,10 @@ pub fn store_factory<'a>(
let store: Arc<dyn Store> = match backend {
StoreConfig::memory(config) => Arc::new(MemoryStore::new(config)),
StoreConfig::s3_store(config) => Arc::new(S3Store::new(config)?),
StoreConfig::redis_store(config) => {
let store: Arc<RedisStore<redis::aio::Connection>> = Arc::new(RedisStore::new(config.clone()).await?);
store
}
StoreConfig::verify(config) => Arc::new(VerifyStore::new(
config,
store_factory(&config.backend, store_manager, None).await?,
Expand Down
201 changes: 201 additions & 0 deletions cas/store/redis_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Copyright 2023 The Turbo Cache Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use bytes::Bytes;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;

use async_trait::async_trait;
use buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use common::DigestInfo;
use error::{Code, Error, ResultExt};
use futures::future::try_join_all;
use hex::encode;
use redis::aio::ConnectionLike;
use redis::AsyncCommands;
use redis_test::{MockCmd, MockRedisConnection};
use traits::{StoreTrait, UploadSizeInfo};

#[async_trait]
pub trait RedisConnectionTrait: ConnectionLike + AsyncCommands + Send + Sync {
fn new_mock_connection(mock_commands: Vec<MockCmd>) -> Self;
async fn new_connection(url: &str) -> Result<Self, Error>;
}

#[async_trait]
impl RedisConnectionTrait for MockRedisConnection {
fn new_mock_connection(mock_commands: Vec<MockCmd>) -> Self {
MockRedisConnection::new(mock_commands)
}
async fn new_connection(_url: &str) -> Result<Self, Error> {
Err(Error::new(
Code::InvalidArgument,
"MockRedisConnection cannot create a real connection".to_string(),
))
}
}

#[async_trait]
impl RedisConnectionTrait for redis::aio::Connection {
fn new_mock_connection(_mock_commands: Vec<MockCmd>) -> Self {
panic!("Cannot create a mock connection from a real connection")
}
async fn new_connection(url: &str) -> Result<Self, Error> {
let client = redis::Client::open(url)?;
client.get_async_connection().await.map_err(Error::from)
}
}

pub struct RedisStore<T: RedisConnectionTrait + 'static> {
conn: Arc<Mutex<T>>,
}

impl<T: RedisConnectionTrait + 'static> RedisStore<T> {
pub async fn new(config: config::stores::RedisStore) -> Result<Self, Error> {
let conn = Self::configure_connection(&config).await?;
Ok(RedisStore { conn })
}

async fn configure_connection(config: &config::stores::RedisStore) -> Result<Arc<Mutex<T>>, Error> {
let conn = if let Some(use_mock) = config.use_mock {
if use_mock {
let mock_commands = config.mock_commands.clone().unwrap_or_default();
let mock_data = config.mock_data.clone().unwrap_or_default();
let mut mock_cmds = Vec::new();
let mut data_iter = mock_data.iter();
for cmd in &mock_commands {
let mock_cmd = match cmd.as_str() {
"SET" => {
let digest = data_iter
.next()
.ok_or(Error::new(Code::NotFound, "Missing digest for SET".to_string()))?;
let data = data_iter
.next()
.ok_or(Error::new(Code::NotFound, "Missing data for SET".to_string()))?;
MockCmd::new(redis::cmd(cmd).arg(digest).arg(data), Ok(1))
}
"EXISTS" => {
let digest = data_iter
.next()
.ok_or(Error::new(Code::NotFound, "Missing digest for EXISTS".to_string()))?;
MockCmd::new(redis::cmd(cmd).arg(digest), Ok(1))
}
"GET" => {
let digest = data_iter
.next()
.ok_or(Error::new(Code::NotFound, "Missing digest for GET".to_string()))?;
MockCmd::new(redis::cmd(cmd).arg(digest), Ok("14"))
}
_ => return Err(Error::new(Code::NotFound, format!("Unsupported command: {}", cmd))),
};
mock_cmds.push(mock_cmd);
}
Ok(Arc::new(Mutex::new(T::new_mock_connection(mock_cmds))))
} else {
T::new_connection(config.url.as_str())
.await
.map(|connection| Arc::new(Mutex::new(connection)))
}
} else {
T::new_connection(config.url.as_str())
.await
.map(|connection| Arc::new(Mutex::new(connection)))
};
conn
}
}

#[async_trait]
impl<T: RedisConnectionTrait + 'static> StoreTrait for RedisStore<T> {
async fn has_with_results(
self: Pin<&Self>,
digests: &[DigestInfo],
results: &mut [Option<usize>],
) -> Result<(), Error> {
// TODO: Do not collect use buffer_unordered(SOME_N_LIMIT)
let futures: Vec<_> = digests
.iter()
.enumerate()
.map(|(index, digest)| async move {
let conn = Arc::clone(&self.conn);
let mut conn = conn.lock().await;
let packed_hash = encode(digest.packed_hash);
let exists: bool = conn.exists(&packed_hash).await?;
Ok::<(usize, _), Error>((index, exists))
})
.collect();

let results_vec: Vec<_> = try_join_all(futures).await?;

for (index, exists) in results_vec {
results[index] = Some(exists as usize);
}
Ok(())
}

async fn update(
self: Pin<&Self>,
digest: DigestInfo,
reader: DropCloserReadHalf,
size_info: UploadSizeInfo,
) -> Result<(), Error> {
let size = match size_info {
UploadSizeInfo::ExactSize(size) => size,
UploadSizeInfo::MaxSize(size) => size,
};

let buffer = reader
.collect_all_with_size_hint(size)
.await
.err_tip(|| "Failed to collect all bytes from reader in redis_store::update")?;

let conn = Arc::clone(&self.conn);
let mut conn = conn.lock().await;
let packed_hash = encode(digest.packed_hash);
let _: () = conn.set(&packed_hash, &buffer[..]).await?;
Ok(())
}

async fn get_part_ref(
self: Pin<&Self>,
digest: DigestInfo,
writer: &mut DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
let conn = Arc::clone(&self.conn);
let mut conn = conn.lock().await;
let packed_hash = encode(digest.packed_hash);
let value = conn.get::<_, Vec<u8>>(&packed_hash).await?;
let default_len = value.len() - offset;
let bytes_wrapper = Bytes::from(value);
let length = length.unwrap_or(default_len).min(default_len);
if length > 0 {
writer
.send(bytes_wrapper.slice(offset..(offset + length)))
.await
.err_tip(|| "Failed to write data in Redis store")?;
}
writer
.send_eof()
.await
.err_tip(|| "Failed to write EOF in redis store get_part")?;
Ok(())
}

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}
}
Loading

0 comments on commit c374aaf

Please sign in to comment.