@@ -234,13 +234,14 @@ def _get(self, i):
234
234
235
235
fcntl .flock (self .cache_fd , fcntl .LOCK_SH )
236
236
with record (f"pfio.cache.multiprocessfile:get:lock-{ self .cache_fd } " , trace = self .trace ):
237
- index_entry = os .pread (self .cache_fd , self .buflen , offset )
237
+ with record ("pfio.cache.multiprocessfile:get:read_index" , trace = self .trace ):
238
+ index_entry = os .pread (self .cache_fd , self .buflen , offset )
238
239
(o , l ) = unpack ('Qq' , index_entry )
239
240
if l < 0 or o < 0 :
240
241
fcntl .flock (self .cache_fd , fcntl .LOCK_UN )
241
242
return None
242
243
243
- with record ("pfio.cache.multiprocessfile:get:read " , trace = self .trace ):
244
+ with record ("pfio.cache.multiprocessfile:get:read_data " , trace = self .trace ):
244
245
data = os .pread (self .cache_fd , l , o )
245
246
assert len (data ) == l
246
247
fcntl .flock (self .cache_fd , fcntl .LOCK_UN )
@@ -279,26 +280,30 @@ def _put(self, i, data):
279
280
280
281
fcntl .flock (self .cache_fd , fcntl .LOCK_EX )
281
282
with record (f"pfio.cache.multiprocessfile:put:lock-{ self .cache_fd } " , trace = self .trace ):
282
- buf = os .pread (self .cache_fd , self .buflen , index_ofst )
283
+ with record ("pfio.cache.multiprocessfile:put:read_index" , trace = self .trace ):
284
+ buf = os .pread (self .cache_fd , self .buflen , index_ofst )
283
285
(o , l ) = unpack ('Qq' , buf )
284
286
285
287
if l >= 0 and o >= 0 :
286
288
# Already data exists
287
289
fcntl .flock (self .cache_fd , fcntl .LOCK_UN )
288
290
return False
289
291
290
- data_pos = os .lseek (self .cache_fd , 0 , os .SEEK_END )
292
+ with record ("pfio.cache.multiprocessfile:put:seek" , trace = self .trace ):
293
+ data_pos = os .lseek (self .cache_fd , 0 , os .SEEK_END )
291
294
if self .cache_size_limit :
292
295
if self .cache_size_limit < (data_pos + len (data )):
293
296
self ._frozen = True
294
297
fcntl .flock (self .cache_fd , fcntl .LOCK_UN )
295
298
return False
296
299
297
300
index_entry = pack ('Qq' , data_pos , len (data ))
298
- assert os .pwrite (self .cache_fd , index_entry , index_ofst ) == self .buflen
299
- with record ("pfio.cache.multiprocessfile:put:write" , trace = self .trace ):
301
+ with record ("pfio.cache.multiprocessfile:put:write_index" , trace = self .trace ):
302
+ assert os .pwrite (self .cache_fd , index_entry , index_ofst ) == self .buflen
303
+ with record ("pfio.cache.multiprocessfile:put:write_data" , trace = self .trace ):
300
304
assert os .pwrite (self .cache_fd , data , data_pos ) == len (data )
301
- os .fsync (self .cache_fd )
305
+ with record ("pfio.cache.multiprocessfile:put:sync" , trace = self .trace ):
306
+ os .fsync (self .cache_fd )
302
307
fcntl .flock (self .cache_fd , fcntl .LOCK_UN )
303
308
304
309
return True
0 commit comments