Skip to content

Commit 5fb0608

Browse files
committed
Rework concurrency semantics of valueLog.maxFid (hypermodeinc#1184)
1 parent 2a90c66 commit 5fb0608

File tree

1 file changed

+28
-20
lines changed

1 file changed

+28
-20
lines changed

value.go

+28-20
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,9 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) (uint32,
493493
}
494494

495495
func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error {
496-
maxFid := atomic.LoadUint32(&vlog.maxFid)
496+
vlog.filesLock.RLock()
497+
maxFid := vlog.maxFid
498+
vlog.filesLock.RUnlock()
497499
y.AssertTruef(uint32(f.fid) < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid)
498500
tr.LazyPrintf("Rewriting fid: %d", f.fid)
499501

@@ -808,10 +810,9 @@ func (vlog *valueLog) dropAll() (int, error) {
808810
}
809811

810812
vlog.db.opt.Infof("Value logs deleted. Creating value log file: 0")
811-
if _, err := vlog.createVlogFile(0); err != nil {
813+
if _, err := vlog.createVlogFile(0); err != nil { // Called while writes are stopped.
812814
return count, err
813815
}
814-
atomic.StoreUint32(&vlog.maxFid, 0)
815816
return count, nil
816817
}
817818

@@ -832,12 +833,12 @@ type valueLog struct {
832833
// guards our view of which files exist, which to be deleted, how many active iterators
833834
filesLock sync.RWMutex
834835
filesMap map[uint32]*logFile
836+
maxFid uint32
835837
filesToBeDeleted []uint32
836838
// A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
837839
numActiveIterators int32
838840

839841
db *DB
840-
maxFid uint32 // accessed via atomics.
841842
writableLogOffset uint32 // read by read, written by write. Must access via atomics.
842843
numEntriesWritten uint32
843844
opt Options
@@ -1005,6 +1006,7 @@ func (vlog *valueLog) createVlogFile(fid uint32) (*logFile, error) {
10051006

10061007
vlog.filesLock.Lock()
10071008
vlog.filesMap[fid] = lf
1009+
vlog.maxFid = fid
10081010
vlog.filesLock.Unlock()
10091011

10101012
return lf, nil
@@ -1155,12 +1157,12 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error {
11551157
// plain text mode or vice versa. A single vlog file can't have both
11561158
// encrypted entries and plain text entries.
11571159
if last.encryptionEnabled() != vlog.db.shouldEncrypt() {
1158-
newid := atomic.AddUint32(&vlog.maxFid, 1)
1160+
newid := vlog.maxFid + 1
11591161
_, err := vlog.createVlogFile(newid)
11601162
if err != nil {
11611163
return y.Wrapf(err, "Error while creating log file %d in valueLog.open", newid)
11621164
}
1163-
last, ok = vlog.filesMap[vlog.maxFid]
1165+
last, ok = vlog.filesMap[newid]
11641166
y.AssertTrue(ok)
11651167
}
11661168
lastOffset, err := last.fd.Seek(0, io.SeekEnd)
@@ -1222,7 +1224,7 @@ func (vlog *valueLog) Close() error {
12221224
err = munmapErr
12231225
}
12241226

1225-
maxFid := atomic.LoadUint32(&vlog.maxFid)
1227+
maxFid := vlog.maxFid
12261228
if !vlog.opt.ReadOnly && id == maxFid {
12271229
// truncate writable log file to correct offset.
12281230
if truncErr := f.fd.Truncate(
@@ -1320,7 +1322,7 @@ func (vlog *valueLog) sync(fid uint32) error {
13201322
}
13211323

13221324
vlog.filesLock.RLock()
1323-
maxFid := atomic.LoadUint32(&vlog.maxFid)
1325+
maxFid := vlog.maxFid
13241326
// During replay it is possible to get sync call with fid less than maxFid.
13251327
// Because older file has already been synced, we can return from here.
13261328
if fid < maxFid || len(vlog.filesMap) == 0 {
@@ -1353,7 +1355,7 @@ func (vlog *valueLog) write(reqs []*request) error {
13531355
return nil
13541356
}
13551357
vlog.filesLock.RLock()
1356-
maxFid := atomic.LoadUint32(&vlog.maxFid)
1358+
maxFid := vlog.maxFid
13571359
curlf := vlog.filesMap[maxFid]
13581360
vlog.filesLock.RUnlock()
13591361

@@ -1385,7 +1387,7 @@ func (vlog *valueLog) write(reqs []*request) error {
13851387
return err
13861388
}
13871389

1388-
newid := atomic.AddUint32(&vlog.maxFid, 1)
1390+
newid := vlog.maxFid + 1
13891391
y.AssertTruef(newid > 0, "newid has overflown uint32: %v", newid)
13901392
newlf, err := vlog.createVlogFile(newid)
13911393
if err != nil {
@@ -1446,28 +1448,33 @@ func (vlog *valueLog) write(reqs []*request) error {
14461448

14471449
// Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
14481450
// (if non-nil)
1449-
func (vlog *valueLog) getFileRLocked(fid uint32) (*logFile, error) {
1451+
func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {
14501452
vlog.filesLock.RLock()
14511453
defer vlog.filesLock.RUnlock()
1452-
ret, ok := vlog.filesMap[fid]
1454+
ret, ok := vlog.filesMap[vp.Fid]
14531455
if !ok {
14541456
// log file has gone away, will need to retry the operation.
14551457
return nil, ErrRetry
14561458
}
1459+
1460+
// Check for valid offset if we are reading from writable log.
1461+
maxFid := vlog.maxFid
1462+
if vp.Fid == maxFid {
1463+
currentOffset := vlog.woffset()
1464+
if vp.Offset >= currentOffset {
1465+
return nil, errors.Errorf(
1466+
"Invalid value pointer offset: %d greater than current offset: %d",
1467+
vp.Offset, currentOffset)
1468+
}
1469+
}
1470+
14571471
ret.lock.RLock()
14581472
return ret, nil
14591473
}
14601474

14611475
// Read reads the value log at a given location.
14621476
// TODO: Make this read private.
14631477
func (vlog *valueLog) Read(vp valuePointer, s *y.Slice) ([]byte, func(), error) {
1464-
// Check for valid offset if we are reading from writable log.
1465-
maxFid := atomic.LoadUint32(&vlog.maxFid)
1466-
if vp.Fid == maxFid && vp.Offset >= vlog.woffset() {
1467-
return nil, nil, errors.Errorf(
1468-
"Invalid value pointer offset: %d greater than current offset: %d",
1469-
vp.Offset, vlog.woffset())
1470-
}
14711478
buf, lf, err := vlog.readValueBytes(vp, s)
14721479
// log file is locked so, decide whether to lock immediately or let the caller to
14731480
// unlock it, after caller uses it.
@@ -1517,10 +1524,11 @@ func (vlog *valueLog) getUnlockCallback(lf *logFile) func() {
15171524
// readValueBytes return vlog entry slice and read locked log file. Caller should take care of
15181525
// logFile unlocking.
15191526
func (vlog *valueLog) readValueBytes(vp valuePointer, s *y.Slice) ([]byte, *logFile, error) {
1520-
lf, err := vlog.getFileRLocked(vp.Fid)
1527+
lf, err := vlog.getFileRLocked(vp)
15211528
if err != nil {
15221529
return nil, nil, err
15231530
}
1531+
15241532
buf, err := lf.read(vp, s)
15251533
return buf, lf, err
15261534
}

0 commit comments

Comments
 (0)