Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 60 additions & 14 deletions lib/vector-buffers/src/variants/disk_v2/io.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use std::{io, path::Path};

use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::{
fs::OpenOptions,
io::{AsyncRead, AsyncWrite},
};

#[cfg(unix)]
const FILE_MODE_OWNER_RW_GROUP_RO: u32 = 0o640;

/// File metadata.
pub struct Metadata {
Expand Down Expand Up @@ -129,39 +135,32 @@ impl Filesystem for ProductionFilesystem {
type MutableMemoryMap = memmap2::MmapMut;

async fn open_file_writable(&self, path: &Path) -> io::Result<Self::File> {
tokio::fs::OpenOptions::new()
create_writable_file_options(false)
.append(true)
.read(true)
.create(true)
.open(path)
.await
}

async fn open_file_writable_atomic(&self, path: &Path) -> io::Result<Self::File> {
tokio::fs::OpenOptions::new()
create_writable_file_options(true)
.append(true)
.read(true)
.create_new(true)
.open(path)
.await
}

async fn open_file_readable(&self, path: &Path) -> io::Result<Self::File> {
tokio::fs::OpenOptions::new().read(true).open(path).await
open_readable_file_options().open(path).await
}

async fn open_mmap_readable(&self, path: &Path) -> io::Result<Self::MemoryMap> {
let file = tokio::fs::OpenOptions::new().read(true).open(path).await?;
let file = open_readable_file_options().open(path).await?;
let std_file = file.into_std().await;
unsafe { memmap2::Mmap::map(&std_file) }
}

async fn open_mmap_writable(&self, path: &Path) -> io::Result<Self::MutableMemoryMap> {
let file = tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.open(path)
.await?;
let file = open_writable_file_options().open(path).await?;

let std_file = file.into_std().await;
unsafe { memmap2::MmapMut::map_mut(&std_file) }
}
Expand All @@ -171,6 +170,53 @@ impl Filesystem for ProductionFilesystem {
}
}

/// Builds a set of `OpenOptions` for opening a file as readable/writable.
fn open_writable_file_options() -> OpenOptions {
let mut open_options = OpenOptions::new();
open_options.read(true).write(true);

#[cfg(unix)]
{
open_options.mode(FILE_MODE_OWNER_RW_GROUP_RO);
}

open_options
}

/// Builds a set of `OpenOptions` for opening a file as readable/writable, creating it if it does
/// not already exist.
///
/// When `create_atomic` is set to `true`, this ensures that the operation only succeeds if the
/// subsequent call to `open` is able to create the file, ensuring that another process did not
/// create it before us. Otherwise, the normal create mode is configured, which creates the file if
/// it does not exist but does not throw an error if it already did.
///
/// On Unix platforms, file permissions will be set so that only the owning user of the file can
/// write to it, the owning group can read it, and the file is inaccessible otherwise.
fn create_writable_file_options(create_atomic: bool) -> OpenOptions {
let mut open_options = open_writable_file_options();

#[cfg(unix)]
{
open_options.mode(FILE_MODE_OWNER_RW_GROUP_RO);
}

if create_atomic {
open_options.create_new(true);
} else {
open_options.create(true);
}

open_options
}

/// Builds a set of `OpenOptions` for opening a file as readable.
fn open_readable_file_options() -> OpenOptions {
let mut open_options = OpenOptions::new();
open_options.read(true);
open_options
}

#[async_trait]
impl AsyncFile for tokio::fs::File {
async fn metadata(&self) -> io::Result<Metadata> {
Expand Down