@@ -18,8 +18,8 @@ use anyhow::{bail, ensure, Context, Result};
1818use once_cell:: sync:: OnceCell ;
1919use rustix:: {
2020 fs:: {
21- fdatasync , flock, linkat, mkdirat, open, openat, readlinkat, statat, AtFlags , Dir ,
22- FileType , FlockOperation , Mode , OFlags , CWD ,
21+ flock, linkat, mkdirat, open, openat, readlinkat, statat, syncfs , AtFlags , Dir , FileType ,
22+ FlockOperation , Mode , OFlags , CWD ,
2323 } ,
2424 io:: { Errno , Result as ErrnoResult } ,
2525} ;
@@ -128,12 +128,18 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
128128 ///
129129 /// Same as `ensure_object` but runs the operation on a blocking thread pool
130130 /// to avoid blocking async tasks. Returns the fsverity digest of the object.
131+ ///
132+ /// For performance reasons, this function does *not* call fsync() or similar. After you're
133+ /// done with everything, call `Repository::sync_async()`.
131134 pub async fn ensure_object_async ( self : & Arc < Self > , data : Vec < u8 > ) -> Result < ObjectID > {
132135 let self_ = Arc :: clone ( self ) ;
133136 tokio:: task:: spawn_blocking ( move || self_. ensure_object ( & data) ) . await ?
134137 }
135138
136139 /// Given a blob of data, store it in the repository.
140+ ///
141+ /// For performance reasons, this function does *not* call fsync() or similar. After you're
142+ /// done with everything, call `Repository::sync()`.
137143 pub fn ensure_object ( & self , data : & [ u8 ] ) -> Result < ObjectID > {
138144 let dirfd = self . objects_dir ( ) ?;
139145 let id: ObjectID = compute_verity ( data) ;
@@ -179,14 +185,15 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
179185 let fd = ensure_dir_and_openat ( dirfd, & id. to_object_dir ( ) , OFlags :: RDWR | OFlags :: TMPFILE ) ?;
180186 let mut file = File :: from ( fd) ;
181187 file. write_all ( data) ?;
182- fdatasync ( & file) ?;
183-
184188 // We can't enable verity with an open writable fd, so re-open and close the old one.
185189 let ro_fd = open (
186190 proc_self_fd ( & file) ,
187191 OFlags :: RDONLY | OFlags :: CLOEXEC ,
188192 Mode :: empty ( ) ,
189193 ) ?;
194+ // NB: We should do fdatasync() or fsync() here, but doing this for each file forces the
195+ // creation of a massive number of journal commits and is a performance disaster. We need
196+ // to coordinate this at a higher level. See .write_stream().
190197 drop ( file) ;
191198
192199 let ro_fd = match enable_verity_maybe_copy :: < ObjectID > ( dirfd, ro_fd. as_fd ( ) ) {
@@ -279,16 +286,36 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
279286 }
280287 }
281288
282- /// Write the given splitstream to the repository with the
283- /// provided content identifier and optional reference name.
289+ /// Write the given splitstream to the repository with the provided content identifier and
290+ /// optional reference name.
291+ ///
292+ /// This call contains an internal barrier that guarantees that, in event of a crash, either:
293+ /// - the named stream (by `content_identifier`) will not be available; or
294+ /// - the stream and all of its linked data will be available
295+ ///
296+ /// In other words: it will not be possible to boot a system which contained a stream named
297+ /// `content_identifier` but is missing linked streams or objects from that stream.
284298 pub fn write_stream (
285299 & self ,
286300 writer : SplitStreamWriter < ObjectID > ,
287301 content_identifier : & str ,
288302 reference : Option < & str > ,
289303 ) -> Result < ObjectID > {
290- let stream_path = Self :: format_stream_path ( content_identifier) ;
291304 let object_id = writer. done ( ) ?;
305+
306+ // Right now we have:
307+ // - all of the linked external objects and streams; and
308+ // - the binary data of this splitstream itself
309+ //
310+ // in the filesystem but but not yet guaranteed to be synced to disk. This is OK because
311+ // nobody knows that the binary data of the splitstream is a splitstream yet: it could just
312+ // as well be a random data file contained in an OS image or something.
313+ //
314+ // We need to make sure that all of that makes it to the disk before the splitstream is
315+ // visible as a splitstream.
316+ self . sync ( ) ?;
317+
318+ let stream_path = Self :: format_stream_path ( content_identifier) ;
292319 let object_path = Self :: format_object_path ( & object_id) ;
293320 self . symlink ( & stream_path, & object_path) ?;
294321
@@ -625,6 +652,24 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
625652 Ok ( crate :: erofs:: reader:: collect_objects ( & data) ?)
626653 }
627654
655+ /// Makes sure all content is written to the repository.
656+ ///
657+ /// This is currently just syncfs() on the repository's root directory because we don't have
658+ /// any better options at present. This blocks until the data is written out.
659+ pub fn sync ( & self ) -> Result < ( ) > {
660+ syncfs ( & self . repository ) ?;
661+ Ok ( ( ) )
662+ }
663+
664+ /// Makes sure all content is written to the repository.
665+ ///
666+ /// This is currently just syncfs() on the repository's root directory because we don't have
667+ /// any better options at present. This won't return until the data is written out.
668+ pub async fn sync_async ( self : & Arc < Self > ) -> Result < ( ) > {
669+ let self_ = Arc :: clone ( self ) ;
670+ tokio:: task:: spawn_blocking ( move || self_. sync ( ) ) . await ?
671+ }
672+
628673 /// Perform a garbage collection operation.
629674 ///
630675 /// # Locking
0 commit comments