Skip to content

Commit

Permalink
Rework concurrency semantics of valueLog.maxFid (dgraph-io#1184) (dgr…
Browse files Browse the repository at this point in the history
…aph-io#1187)

Move all access to `valueLog.maxFid` under `valueLog.filesLock`, while
all mutations happen either with writes stopped or sequentially under
valueLog.write.

Fixes a concurrency issue in `valueLog.Read` where the maxFid variable
and the `writableLogOffset` variable could point to two different log files.
  • Loading branch information
damz authored and Ibrahim Jarif committed Jan 27, 2020
1 parent 4676ca9 commit 3e25d77
Showing 1 changed file with 31 additions and 23 deletions.
54 changes: 31 additions & 23 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,9 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) (uint32,
}

func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error {
maxFid := atomic.LoadUint32(&vlog.maxFid)
vlog.filesLock.RLock()
maxFid := vlog.maxFid
vlog.filesLock.RUnlock()
y.AssertTruef(uint32(f.fid) < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid)
tr.LazyPrintf("Rewriting fid: %d", f.fid)

Expand Down Expand Up @@ -808,10 +810,9 @@ func (vlog *valueLog) dropAll() (int, error) {
}

vlog.db.opt.Infof("Value logs deleted. Creating value log file: 0")
if _, err := vlog.createVlogFile(0); err != nil {
if _, err := vlog.createVlogFile(0); err != nil { // Called while writes are stopped.
return count, err
}
atomic.StoreUint32(&vlog.maxFid, 0)
return count, nil
}

Expand All @@ -832,12 +833,12 @@ type valueLog struct {
// guards our view of which files exist, which to be deleted, how many active iterators
filesLock sync.RWMutex
filesMap map[uint32]*logFile
maxFid uint32
filesToBeDeleted []uint32
// A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
numActiveIterators int32

db *DB
maxFid uint32 // accessed via atomics.
writableLogOffset uint32 // read by read, written by write. Must access via atomics.
numEntriesWritten uint32
opt Options
Expand Down Expand Up @@ -997,14 +998,15 @@ func (vlog *valueLog) createVlogFile(fid uint32) (*logFile, error) {
if err = lf.mmap(2 * vlog.opt.ValueLogFileSize); err != nil {
return nil, errFile(err, lf.path, "Mmap value log file")
}

vlog.filesLock.Lock()
vlog.filesMap[fid] = lf
vlog.maxFid = fid
// writableLogOffset is only written by write func, by read by Read func.
// To avoid a race condition, all reads and updates to this variable must be
// done via atomics.
atomic.StoreUint32(&vlog.writableLogOffset, vlogHeaderSize)
vlog.numEntriesWritten = 0

vlog.filesLock.Lock()
vlog.filesMap[fid] = lf
vlog.filesLock.Unlock()

return lf, nil
Expand Down Expand Up @@ -1155,12 +1157,12 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error {
// plain text mode or vice versa. A single vlog file can't have both
// encrypted entries and plain text entries.
if last.encryptionEnabled() != vlog.db.shouldEncrypt() {
newid := atomic.AddUint32(&vlog.maxFid, 1)
newid := vlog.maxFid + 1
_, err := vlog.createVlogFile(newid)
if err != nil {
return y.Wrapf(err, "Error while creating log file %d in valueLog.open", newid)
}
last, ok = vlog.filesMap[vlog.maxFid]
last, ok = vlog.filesMap[newid]
y.AssertTrue(ok)
}
lastOffset, err := last.fd.Seek(0, io.SeekEnd)
Expand Down Expand Up @@ -1222,7 +1224,7 @@ func (vlog *valueLog) Close() error {
err = munmapErr
}

maxFid := atomic.LoadUint32(&vlog.maxFid)
maxFid := vlog.maxFid
if !vlog.opt.ReadOnly && id == maxFid {
// truncate writable log file to correct offset.
if truncErr := f.fd.Truncate(
Expand Down Expand Up @@ -1320,7 +1322,7 @@ func (vlog *valueLog) sync(fid uint32) error {
}

vlog.filesLock.RLock()
maxFid := atomic.LoadUint32(&vlog.maxFid)
maxFid := vlog.maxFid
// During replay it is possible to get sync call with fid less than maxFid.
// Because older file has already been synced, we can return from here.
if fid < maxFid || len(vlog.filesMap) == 0 {
Expand Down Expand Up @@ -1353,7 +1355,7 @@ func (vlog *valueLog) write(reqs []*request) error {
return nil
}
vlog.filesLock.RLock()
maxFid := atomic.LoadUint32(&vlog.maxFid)
maxFid := vlog.maxFid
curlf := vlog.filesMap[maxFid]
vlog.filesLock.RUnlock()

Expand Down Expand Up @@ -1385,7 +1387,7 @@ func (vlog *valueLog) write(reqs []*request) error {
return err
}

newid := atomic.AddUint32(&vlog.maxFid, 1)
newid := vlog.maxFid + 1
y.AssertTruef(newid > 0, "newid has overflown uint32: %v", newid)
newlf, err := vlog.createVlogFile(newid)
if err != nil {
Expand Down Expand Up @@ -1446,28 +1448,33 @@ func (vlog *valueLog) write(reqs []*request) error {

// Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
// (if non-nil)
func (vlog *valueLog) getFileRLocked(fid uint32) (*logFile, error) {
func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {
vlog.filesLock.RLock()
defer vlog.filesLock.RUnlock()
ret, ok := vlog.filesMap[fid]
ret, ok := vlog.filesMap[vp.Fid]
if !ok {
// log file has gone away, will need to retry the operation.
return nil, ErrRetry
}

// Check for valid offset if we are reading from writable log.
maxFid := vlog.maxFid
if vp.Fid == maxFid {
currentOffset := vlog.woffset()
if vp.Offset >= currentOffset {
return nil, errors.Errorf(
"Invalid value pointer offset: %d greater than current offset: %d",
vp.Offset, currentOffset)
}
}

ret.lock.RLock()
return ret, nil
}

// Read reads the value log at a given location.
// TODO: Make this read private.
func (vlog *valueLog) Read(vp valuePointer, s *y.Slice) ([]byte, func(), error) {
// Check for valid offset if we are reading from writable log.
maxFid := atomic.LoadUint32(&vlog.maxFid)
if vp.Fid == maxFid && vp.Offset >= vlog.woffset() {
return nil, nil, errors.Errorf(
"Invalid value pointer offset: %d greater than current offset: %d",
vp.Offset, vlog.woffset())
}
buf, lf, err := vlog.readValueBytes(vp, s)
// log file is locked so, decide whether to lock immediately or let the caller to
// unlock it, after caller uses it.
Expand Down Expand Up @@ -1517,10 +1524,11 @@ func (vlog *valueLog) getUnlockCallback(lf *logFile) func() {
// readValueBytes return vlog entry slice and read locked log file. Caller should take care of
// logFile unlocking.
func (vlog *valueLog) readValueBytes(vp valuePointer, s *y.Slice) ([]byte, *logFile, error) {
lf, err := vlog.getFileRLocked(vp.Fid)
lf, err := vlog.getFileRLocked(vp)
if err != nil {
return nil, nil, err
}

buf, err := lf.read(vp, s)
return buf, lf, err
}
Expand Down

0 comments on commit 3e25d77

Please sign in to comment.