Skip to content

Commit

Permalink
Merge pull request #149 from 0xnim/import_optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
ReCore-sys authored Dec 31, 2024
2 parents f279f65 + a3c0b39 commit 5e27c5c
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 129 deletions.
39 changes: 39 additions & 0 deletions src/lib/storage/src/lmdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,45 @@ impl LmdbBackend {
.expect("Failed to run tokio task")
}

pub async fn batch_upsert(
&self,
table: String,
data: Vec<(u128, Vec<u8>)>,
) -> Result<(), StorageError> {
let env = self.env.clone();
tokio::task::spawn_blocking(move || {
let mut rw_txn = env.write_txn()?;

// Open or create the database for the given table
let db = env.create_database::<U128<BigEndian>, Bytes>(&mut rw_txn, Some(&table))?;

// Create a map of keys and their associated values
let keymap: HashMap<u128, &Vec<u8>> = data.iter().map(|(k, v)| (*k, v)).collect();

// Iterate through the keys in sorted order
let mut sorted_keys: Vec<u128> = keymap.keys().cloned().collect();
sorted_keys.sort();

// Iterate through the sorted keys to perform upserts
for key in sorted_keys {
// Check if the key already exists
if db.get(&rw_txn, &key)?.is_some() {
// Update the value if it exists (you can modify this logic as needed)
db.put(&mut rw_txn, &key, keymap[&key])?;
} else {
// Insert the new key-value pair if the key doesn't exist
db.put(&mut rw_txn, &key, keymap[&key])?;
}
}

// Commit the transaction after all upserts are performed
rw_txn.commit()?;
Ok(())
})
.await
.expect("Failed to run tokio task")
}

pub async fn exists(&self, table: String, key: u128) -> Result<bool, StorageError> {
let env = self.env.clone();
tokio::task::spawn_blocking(move || {
Expand Down
26 changes: 26 additions & 0 deletions src/lib/world/src/db_functions.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// db_functions.rs
use crate::chunk_format::Chunk;
use crate::errors::WorldError;
use crate::World;
Expand Down Expand Up @@ -129,6 +130,31 @@ pub(crate) async fn save_chunk_internal(world: &World, chunk: Chunk) -> Result<(
Ok(())
}

pub(crate) async fn save_chunk_internal_batch(
world: &World,
chunks: Vec<Chunk>,
) -> Result<(), WorldError> {
// Prepare the batch data for the upsert
let mut batch_data = Vec::new();

for chunk in chunks.iter() {
// Compress the chunk and encode it
let as_bytes = world.compressor.compress(&bitcode::encode(chunk))?;
// Create the key for the chunk
let digest = create_key(chunk.dimension.as_str(), chunk.x, chunk.z);
// Collect the key-value pair into the batch data
batch_data.push((digest, as_bytes));
}

// Perform the batch upsert
world
.storage_backend
.batch_upsert("chunks".to_string(), batch_data)
.await?;

Ok(())
}

pub(crate) async fn load_chunk_internal(
world: &World,
compressor: &Compressor,
Expand Down
10 changes: 10 additions & 0 deletions src/lib/world/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ pub enum WorldError {
MissingBlockMapping(Palette),
#[error("Invalid memory map size: {0}")]
InvalidMapSize(u64),
#[error("Task Join Error: {0}")]
TaskJoinError(String),
}

// implemente AcquireError for WorldError
use tokio::sync::AcquireError;
impl From<AcquireError> for WorldError {
fn from(err: AcquireError) -> Self {
WorldError::TaskJoinError(err.to_string())
}
}

impl From<std::io::Error> for WorldError {
Expand Down
Loading

0 comments on commit 5e27c5c

Please sign in to comment.