Skip to content

Commit

Permalink
Merge pull request #2 from risingwavelabs/feat/stats
Browse files Browse the repository at this point in the history
fix: fix a serious bug caused by misunderstanding of orderedmap
  • Loading branch information
arkbriar authored Jul 26, 2024
2 parents 171dda6 + b77498e commit 0916010
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 28 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ module github.com/risingwavelabs/filechannel
go 1.21.4

require (
github.com/elliotchance/orderedmap/v2 v2.2.0
github.com/gofrs/flock v0.8.1
github.com/ironpark/skiplist v0.0.0-20230103051251-d63941a7d606
github.com/klauspost/compress v1.17.8
github.com/stretchr/testify v1.8.4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15 // indirect
golang.org/x/sys v0.15.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4cyHeOFOnj4Vk=
github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/ironpark/skiplist v0.0.0-20230103051251-d63941a7d606 h1:wAlKAaIDI0lz5hiRVLhpPclpRm+Foqud9Aw+ujyNic0=
github.com/ironpark/skiplist v0.0.0-20230103051251-d63941a7d606/go.mod h1:4anVKuA54EQY/g+NGk+fB2QnIB+kUqRuQ1VkHSnsjmI=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
Expand All @@ -15,6 +15,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15 h1:5oN1Pz/eDhCpbMbLstvIPa0b/BEQo6g6nwV3pLjfM6w=
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
51 changes: 26 additions & 25 deletions internal/filechannel/filechannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package filechannel

import (
"bytes"
"cmp"
"context"
"encoding/binary"
"errors"
Expand All @@ -32,8 +33,8 @@ import (
"sync"
"time"

"github.com/elliotchance/orderedmap/v2"
"github.com/gofrs/flock"
"github.com/ironpark/skiplist"
"github.com/klauspost/compress/snappy"

"github.com/risingwavelabs/filechannel/internal/condvar"
Expand Down Expand Up @@ -155,7 +156,7 @@ type Iterator struct {
autoAck bool
readerIndex uint32
pendingAckCount int
pendingAck *orderedmap.OrderedMap[uint32, int]
pendingAck skiplist.SkipList[uint32, int]

lastErr error
buf *bytes.Buffer
Expand Down Expand Up @@ -294,7 +295,7 @@ func (it *Iterator) readNext() ([]byte, error) {
}

it.pendingAckCount++
it.pendingAck.Set(it.segmentIndex, it.pendingAck.GetOrDefault(it.segmentIndex, 0)+1)
it.pendingAck.Set(it.segmentIndex, getOrDefault(it.pendingAck, it.segmentIndex, 0)+1)
it.offset += uint64(MessageHeaderBinarySize) + uint64(it.buf.Len())
msg := it.buf.Bytes()

Expand All @@ -320,7 +321,7 @@ func (it *Iterator) Ack(n int) error {
for n > 0 {
if n >= f.Value {
n -= f.Value
it.pendingAck.Delete(f.Key)
it.pendingAck.Remove(f.Key())
f = it.pendingAck.Front()
} else {
f.Value -= n
Expand All @@ -330,7 +331,7 @@ func (it *Iterator) Ack(n int) error {

curSegmentIndex := it.segmentIndex
if f != nil {
curSegmentIndex = f.Key
curSegmentIndex = f.Key()
}

if curSegmentIndex > it.readerIndex {
Expand Down Expand Up @@ -444,7 +445,7 @@ func NewIterator(manager *SegmentManager, position *Position, autoAck bool) *Ite
readerIndex: reader,
buf: bytes.NewBuffer(make([]byte, 0, 4096)),
autoAck: autoAck,
pendingAck: orderedmap.NewOrderedMap[uint32, int](),
pendingAck: skiplist.New[uint32, int](cmp.Compare[uint32]),
}
}

Expand Down Expand Up @@ -512,8 +513,8 @@ type SegmentManager struct {

readMu sync.RWMutex
readCond *condvar.Cond
pinFreq *orderedmap.OrderedMap[uint32, int]
readFreq *orderedmap.OrderedMap[uint32, int]
pinFreq skiplist.SkipList[uint32, int]
readFreq skiplist.SkipList[uint32, int]
maxReadIndex int64
watermark uint32
}
Expand All @@ -531,7 +532,7 @@ func (sm *SegmentManager) NewReader() uint32 {
defer sm.readMu.Unlock()

beginIndex := sm.watermark
sm.readFreq.Set(beginIndex, sm.readFreq.GetOrDefault(beginIndex, 0)+1)
sm.readFreq.Set(beginIndex, getOrDefault(sm.readFreq, beginIndex, 0)+1)
return beginIndex
}

Expand All @@ -540,7 +541,7 @@ func (sm *SegmentManager) Pin(index uint32) bool {
defer sm.readMu.Unlock()

if index >= sm.watermark {
sm.pinFreq.Set(index, sm.pinFreq.GetOrDefault(index, 0)+1)
sm.pinFreq.Set(index, getOrDefault(sm.pinFreq, index, 0)+1)
return true
}

Expand All @@ -551,14 +552,14 @@ func (sm *SegmentManager) Unpin(index uint32) {
sm.readMu.Lock()
defer sm.readMu.Unlock()

v, ok := sm.pinFreq.Get(index)
v, ok := sm.pinFreq.GetValue(index)
if !ok {
panic("unpin non-existing segment")
}
isFront := sm.pinFreq.Front().Key == index
isFront := sm.pinFreq.Front().Key() == index

if v == 1 {
sm.pinFreq.Delete(index)
sm.pinFreq.Remove(index)
} else {
sm.pinFreq.Set(index, v-1)
}
Expand All @@ -576,11 +577,11 @@ func (sm *SegmentManager) updateWatermark() {
if readFreqEmpty && pinFreqEmpty {
sm.watermark = uint32(sm.maxReadIndex + 1)
} else if readFreqEmpty {
sm.watermark = sm.pinFreq.Front().Key
sm.watermark = sm.pinFreq.Front().Key()
} else if pinFreqEmpty {
sm.watermark = sm.readFreq.Front().Key
sm.watermark = sm.readFreq.Front().Key()
} else {
sm.watermark = min(sm.pinFreq.Front().Key, sm.readFreq.Front().Key)
sm.watermark = min(sm.pinFreq.Front().Key(), sm.readFreq.Front().Key())
}

if prevWatermark != sm.watermark {
Expand All @@ -592,21 +593,21 @@ func (sm *SegmentManager) AdvanceReader(prev uint32, delta uint32) (uint32, uint
sm.readMu.Lock()
defer sm.readMu.Unlock()

v, ok := sm.readFreq.Get(prev)
v, ok := sm.readFreq.GetValue(prev)
if !ok {
panic("advancing a non-existing reader")
}
isFront := sm.readFreq.Front().Key == prev
isFront := sm.readFreq.Front().Key() == prev

if v == 1 {
sm.readFreq.Delete(prev)
sm.readFreq.Remove(prev)
} else {
sm.readFreq.Set(prev, v-1)
}

next := prev + delta
sm.maxReadIndex = max(int64(next-1), sm.maxReadIndex)
sm.readFreq.Set(next, sm.readFreq.GetOrDefault(next, 0)+1)
sm.readFreq.Set(next, getOrDefault(sm.readFreq, next, 0)+1)

// If the minimum reader index is deleted, there's a chance to
// advance the watermark.
Expand All @@ -621,14 +622,14 @@ func (sm *SegmentManager) CloseReader(cur uint32) uint32 {
sm.readMu.Lock()
defer sm.readMu.Unlock()

v, ok := sm.readFreq.Get(cur)
v, ok := sm.readFreq.GetValue(cur)
if !ok {
panic("closing a non-existing reader")
}
isFront := sm.readFreq.Front().Key == cur
isFront := sm.readFreq.Front().Key() == cur

if v == 1 {
sm.readFreq.Delete(cur)
sm.readFreq.Remove(cur)
} else {
sm.readFreq.Set(cur, v-1)
}
Expand Down Expand Up @@ -724,8 +725,8 @@ func (sm *SegmentManager) IncSegmentIndex() uint32 {
func NewSegmentManager(dir string) *SegmentManager {
sm := &SegmentManager{
dir: dir,
readFreq: orderedmap.NewOrderedMap[uint32, int](),
pinFreq: orderedmap.NewOrderedMap[uint32, int](),
readFreq: skiplist.New[uint32, int](cmp.Compare[uint32]),
pinFreq: skiplist.New[uint32, int](cmp.Compare[uint32]),
maxReadIndex: -1,
}
cond := condvar.NewCond(sm.readMu.RLocker())
Expand Down
28 changes: 28 additions & 0 deletions internal/filechannel/filechannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,34 @@ func TestFileChannel_ReadCompressed(t *testing.T) {
assert.NoError(t, it.Close())
}

func TestFileChannel_ReadCompressed_HoldDeleted(t *testing.T) {
fc := setup(t, "test_file_channel_read_compressed", RotateThreshold(1<<20))
defer teardown(t, fc, true)

itCreatedBeforeCompression := fc.Iterator()

const payloadSize, totalSize = 128, 10 << 20
msgNum := totalSize / payloadSize
payload := magicPayload(payloadSize)

writeAll(t, fc, func(_ int) []byte { return payload }, msgNum)

// Wait until the first segment is compressed.
checkFileChannelDir(t, fc.dir, func(entries []os.DirEntry) bool {
return slices.ContainsFunc(entries, func(entry os.DirEntry) bool {
return entry.Name() == "segment.0.z"
})
}, 10*time.Second)

itCreatedAfterCompression := fc.Iterator()

readAll(t, itCreatedBeforeCompression, func(_ int) []byte { return payload }, msgNum, 10*time.Second)
readAll(t, itCreatedAfterCompression, func(_ int) []byte { return payload }, msgNum, 10*time.Second)

assert.NoError(t, itCreatedBeforeCompression.Close())
assert.NoError(t, itCreatedAfterCompression.Close())
}

func testFileChannelWithRandomStrings(t *testing.T, rand *rand.Rand, minLen, maxLen, size int, parallelism int, opts ...Option) {
fc := setup(t, fmt.Sprintf("file_channel_benchmark_random_%d_%d_%d", minLen, maxLen, size), opts...)
defer teardown(t, fc, false)
Expand Down
25 changes: 25 additions & 0 deletions internal/filechannel/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filechannel

import "github.com/ironpark/skiplist"

func getOrDefault[K, V any](sl skiplist.SkipList[K, V], key K, defaultValue V) V {
v, ok := sl.GetValue(key)
if !ok {
return defaultValue
}
return v
}

0 comments on commit 0916010

Please sign in to comment.