diff --git a/value.go b/value.go index d6d6d79ba..1f1c9434e 100644 --- a/value.go +++ b/value.go @@ -493,7 +493,9 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) (uint32, } func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error { - maxFid := atomic.LoadUint32(&vlog.maxFid) + vlog.filesLock.RLock() + maxFid := vlog.maxFid + vlog.filesLock.RUnlock() y.AssertTruef(uint32(f.fid) < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid) tr.LazyPrintf("Rewriting fid: %d", f.fid) @@ -808,10 +810,9 @@ func (vlog *valueLog) dropAll() (int, error) { } vlog.db.opt.Infof("Value logs deleted. Creating value log file: 0") - if _, err := vlog.createVlogFile(0); err != nil { + if _, err := vlog.createVlogFile(0); err != nil { // Called while writes are stopped. return count, err } - atomic.StoreUint32(&vlog.maxFid, 0) return count, nil } @@ -832,12 +833,12 @@ type valueLog struct { // guards our view of which files exist, which to be deleted, how many active iterators filesLock sync.RWMutex filesMap map[uint32]*logFile + maxFid uint32 filesToBeDeleted []uint32 // A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted. numActiveIterators int32 db *DB - maxFid uint32 // accessed via atomics. writableLogOffset uint32 // read by read, written by write. Must access via atomics. numEntriesWritten uint32 opt Options @@ -997,14 +998,15 @@ func (vlog *valueLog) createVlogFile(fid uint32) (*logFile, error) { if err = lf.mmap(2 * vlog.opt.ValueLogFileSize); err != nil { return nil, errFile(err, lf.path, "Mmap value log file") } + + vlog.filesLock.Lock() + vlog.filesMap[fid] = lf + vlog.maxFid = fid // writableLogOffset is only written by write func, by read by Read func. // To avoid a race condition, all reads and updates to this variable must be // done via atomics. atomic.StoreUint32(&vlog.writableLogOffset, vlogHeaderSize) vlog.numEntriesWritten = 0 - - vlog.filesLock.Lock() - vlog.filesMap[fid] = lf vlog.filesLock.Unlock() return lf, nil @@ -1155,12 +1157,12 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error { // plain text mode or vice versa. A single vlog file can't have both // encrypted entries and plain text entries. if last.encryptionEnabled() != vlog.db.shouldEncrypt() { - newid := atomic.AddUint32(&vlog.maxFid, 1) + newid := vlog.maxFid + 1 _, err := vlog.createVlogFile(newid) if err != nil { return y.Wrapf(err, "Error while creating log file %d in valueLog.open", newid) } - last, ok = vlog.filesMap[vlog.maxFid] + last, ok = vlog.filesMap[newid] y.AssertTrue(ok) } lastOffset, err := last.fd.Seek(0, io.SeekEnd) @@ -1222,7 +1224,7 @@ func (vlog *valueLog) Close() error { err = munmapErr } - maxFid := atomic.LoadUint32(&vlog.maxFid) + maxFid := vlog.maxFid if !vlog.opt.ReadOnly && id == maxFid { // truncate writable log file to correct offset. if truncErr := f.fd.Truncate( @@ -1320,7 +1322,7 @@ func (vlog *valueLog) sync(fid uint32) error { } vlog.filesLock.RLock() - maxFid := atomic.LoadUint32(&vlog.maxFid) + maxFid := vlog.maxFid // During replay it is possible to get sync call with fid less than maxFid. // Because older file has already been synced, we can return from here. if fid < maxFid || len(vlog.filesMap) == 0 { @@ -1353,7 +1355,7 @@ func (vlog *valueLog) write(reqs []*request) error { return nil } vlog.filesLock.RLock() - maxFid := atomic.LoadUint32(&vlog.maxFid) + maxFid := vlog.maxFid curlf := vlog.filesMap[maxFid] vlog.filesLock.RUnlock() @@ -1385,7 +1387,7 @@ func (vlog *valueLog) write(reqs []*request) error { return err } - newid := atomic.AddUint32(&vlog.maxFid, 1) + newid := vlog.maxFid + 1 y.AssertTruef(newid > 0, "newid has overflown uint32: %v", newid) newlf, err := vlog.createVlogFile(newid) if err != nil { @@ -1446,14 +1448,26 @@ func (vlog *valueLog) write(reqs []*request) error { // Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file // (if non-nil) -func (vlog *valueLog) getFileRLocked(fid uint32) (*logFile, error) { +func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) { vlog.filesLock.RLock() defer vlog.filesLock.RUnlock() - ret, ok := vlog.filesMap[fid] + ret, ok := vlog.filesMap[vp.Fid] if !ok { // log file has gone away, will need to retry the operation. return nil, ErrRetry } + + // Check for valid offset if we are reading from writable log. + maxFid := vlog.maxFid + if vp.Fid == maxFid { + currentOffset := vlog.woffset() + if vp.Offset >= currentOffset { + return nil, errors.Errorf( + "Invalid value pointer offset: %d greater than current offset: %d", + vp.Offset, currentOffset) + } + } + ret.lock.RLock() return ret, nil } @@ -1461,13 +1475,6 @@ func (vlog *valueLog) getFileRLocked(fid uint32) (*logFile, error) { // Read reads the value log at a given location. // TODO: Make this read private. func (vlog *valueLog) Read(vp valuePointer, s *y.Slice) ([]byte, func(), error) { - // Check for valid offset if we are reading from writable log. - maxFid := atomic.LoadUint32(&vlog.maxFid) - if vp.Fid == maxFid && vp.Offset >= vlog.woffset() { - return nil, nil, errors.Errorf( - "Invalid value pointer offset: %d greater than current offset: %d", - vp.Offset, vlog.woffset()) - } buf, lf, err := vlog.readValueBytes(vp, s) // log file is locked so, decide whether to lock immediately or let the caller to // unlock it, after caller uses it. @@ -1517,10 +1524,11 @@ func (vlog *valueLog) getUnlockCallback(lf *logFile) func() { // readValueBytes return vlog entry slice and read locked log file. Caller should take care of // logFile unlocking. func (vlog *valueLog) readValueBytes(vp valuePointer, s *y.Slice) ([]byte, *logFile, error) { - lf, err := vlog.getFileRLocked(vp.Fid) + lf, err := vlog.getFileRLocked(vp) if err != nil { return nil, nil, err } + buf, err := lf.read(vp, s) return buf, lf, err }