diff --git a/lib/container/conque/blocking/queue.go b/lib/container/conque/blocking/queue.go index 77bdaed6f..2c127cd1d 100644 --- a/lib/container/conque/blocking/queue.go +++ b/lib/container/conque/blocking/queue.go @@ -45,7 +45,7 @@ func (q *Queue[T]) Close() { q.sem.Release() } -// Wait esures that all incoming Pushes observe that the queue is closed. +// Wait ensures that all incoming Pushes observe that the queue is closed. func (q *Queue[T]) Wait() { // Make sure no inflight Push. q.mu.Lock() diff --git a/lib/executor/multi_flow.go b/lib/executor/multi_flow.go index e48d3b221..82358a2f2 100644 --- a/lib/executor/multi_flow.go +++ b/lib/executor/multi_flow.go @@ -40,7 +40,7 @@ func (f *flow) Execute(t Task) bool { } if f.q.Push(t) { - f.mf.q.Push(f) + return f.mf.q.Push(f) } return true } diff --git a/server/store/meta/async.go b/server/store/meta/async.go index 1c994fd29..05436d576 100644 --- a/server/store/meta/async.go +++ b/server/store/meta/async.go @@ -111,7 +111,7 @@ func (s *AsyncStore) set(kvs Ranger) error { } err := kvs.Range(func(key []byte, value interface{}) error { - s.pending.Set(key, value) + set(s.pending, key, value) return nil }) if err != nil { @@ -190,12 +190,6 @@ func (s *AsyncStore) commit() { s.version = r.EO } -func merge(dst, src *skiplist.SkipList) { - for el := src.Front(); el != nil; el = el.Next() { - set(dst, el.Key().([]byte), el.Value) - } -} - func RecoverAsyncStore(ctx context.Context, dir string, opts ...walog.Option) (*AsyncStore, error) { committed, snapshot, err := recoverLatestSnapshot(ctx, dir, defaultCodec) if err != nil { diff --git a/server/store/meta/store.go b/server/store/meta/store.go index 0541cdddf..e59474038 100644 --- a/server/store/meta/store.go +++ b/server/store/meta/store.go @@ -51,11 +51,3 @@ func (s *store) load(key []byte) (interface{}, bool) { // func (s *store) delete(key []byte) { // set(s.committed, key, deletedMark) // } - -func set(m *skiplist.SkipList, key []byte, value interface{}) { - if value == DeletedMark { - m.Remove(key) - } else { - m.Set(key, value) - } -} diff --git a/server/store/meta/sync.go b/server/store/meta/sync.go index 609df1694..1bfb47fcc 100644 --- a/server/store/meta/sync.go +++ b/server/store/meta/sync.go @@ -126,11 +126,7 @@ func (s *SyncStore) set(ctx context.Context, kvs Ranger, cb StoreCallback) { // Update state. s.mu.Lock() _ = kvs.Range(func(key []byte, value interface{}) error { - if value == DeletedMark { - s.committed.Remove(key) - } else { - s.committed.Set(key, value) - } + update(s.committed, key, value) return nil }) s.version = r.EO @@ -174,7 +170,7 @@ func RecoverSyncStore(ctx context.Context, dir string, opts ...walog.Option) (*S walog.FromPosition(snapshot), walog.WithRecoveryCallback(func(data []byte, r walog.Range) error { err2 := defaultCodec.Unmarshal(data, func(key []byte, value interface{}) error { - set(committed, key, value) + rawUpdate(committed, key, value) return nil }) if err2 != nil { diff --git a/server/store/meta/utils.go b/server/store/meta/utils.go new file mode 100644 index 000000000..d4aab0083 --- /dev/null +++ b/server/store/meta/utils.go @@ -0,0 +1,54 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package meta + +import ( + // third-party libraries. + "github.com/huandu/skiplist" +) + +func update(m *skiplist.SkipList, key []byte, value interface{}) { + if value == DeletedMark { + m.Remove(key) + return + } + + set(m, key, value) +} + +func set(m *skiplist.SkipList, key []byte, value interface{}) { + switch val := value.(type) { + case []byte: + // Make a copy to avoid modifying value outside. + bs := append([]byte{}, val...) + m.Set(key, bs) + default: + m.Set(key, value) + } +} + +func rawUpdate(m *skiplist.SkipList, key []byte, value interface{}) { + if value == DeletedMark { + m.Remove(key) + } else { + m.Set(key, value) + } +} + +func merge(dst, src *skiplist.SkipList) { + for el := src.Front(); el != nil; el = el.Next() { + rawUpdate(dst, el.Key().([]byte), el.Value) + } +} diff --git a/server/store/raft/block/engine.go b/server/store/raft/block/engine.go index 1d75bbb86..041f7e2a9 100644 --- a/server/store/raft/block/engine.go +++ b/server/store/raft/block/engine.go @@ -24,6 +24,7 @@ import ( // third-party libraries. "google.golang.org/grpc" + // first-party libraries. raftpb "github.com/vanus-labs/vanus/api/raft" vanus "github.com/vanus-labs/vanus/api/vsr" "github.com/vanus-labs/vanus/pkg/raft" diff --git a/server/store/raft/storage/compaction.go b/server/store/raft/storage/compaction.go index 09cc8ea12..241851183 100644 --- a/server/store/raft/storage/compaction.go +++ b/server/store/raft/storage/compaction.go @@ -65,7 +65,7 @@ func (s *Storage) Compact(ctx context.Context, i uint64) error { sz := i - ci remaining := s.length() - sz - sr := s.stableLength() - sz + sr := s.stableLength() - sz // stable remaining ents := make([]raftpb.Entry, 1, 1+remaining) offs := make([]int64, 1, 1+sr) @@ -77,7 +77,7 @@ func (s *Storage) Compact(ctx context.Context, i uint64) error { // Copy remained entries. if remaining != 0 { ents = append(ents, s.ents[sz+1:]...) - // NOTE: sr <= remaining + // NOTE: `sr` MUST NOT greater than `remaining` (sr <= remaining). if sr != 0 { offs = append(offs, s.offs[sz+1:]...) offs[0] = offs[1] @@ -107,21 +107,7 @@ func (w *WAL) tryCompact(ctx context.Context, nodeID vanus.ID, offset, last, tai term: term, }, } - - w.closeMu.RLock() - defer w.closeMu.RUnlock() - select { - case <-w.closeC: - return ErrClosed - default: - } - - select { - case w.compactC <- task.compact: - return nil - case <-ctx.Done(): - return ctx.Err() - } + return w.dispatchCompactTask(ctx, task.compact) } func (w *WAL) markBarrier(ctx context.Context, nodeID vanus.ID, offset int64) error { @@ -129,21 +115,7 @@ func (w *WAL) markBarrier(ctx context.Context, nodeID vanus.ID, offset int64) er nodeID: nodeID, offset: offset, } - - w.closeMu.RLock() - defer w.closeMu.RUnlock() - select { - case <-w.closeC: - return ErrClosed - default: - } - - select { - case w.compactC <- task.compact: - return nil - case <-ctx.Done(): - return ctx.Err() - } + return w.dispatchCompactTask(ctx, task.compact) } func (w *WAL) removeBarrier(ctx context.Context, nodeID vanus.ID, offset int64) error { @@ -151,21 +123,7 @@ func (w *WAL) removeBarrier(ctx context.Context, nodeID vanus.ID, offset int64) nodeID: nodeID, last: offset, } - - w.closeMu.RLock() - defer w.closeMu.RUnlock() - select { - case <-w.closeC: - return ErrClosed - default: - } - - select { - case w.compactC <- task.compact: - return nil - case <-ctx.Done(): - return ctx.Err() - } + return w.dispatchCompactTask(ctx, task.compact) } type compactTask struct { @@ -174,7 +132,7 @@ type compactTask struct { info compactInfo } -func (t *compactTask) compact(w *WAL, cCtx *compactContext) { +func (t *compactTask) compact(w *WAL, cc *compactContext) { // node is deleted. if !w.nodes[t.nodeID] { return @@ -188,71 +146,51 @@ func (t *compactTask) compact(w *WAL, cCtx *compactContext) { if t.offset != 0 { w.barrier.Set(t.offset, t.nodeID) } - if t.tail > cCtx.tail { - cCtx.tail = t.tail + if t.tail > cc.tail { + cc.tail = t.tail } // Set compaction info. if !t.info.empty() { - cCtx.infos[t.nodeID] = t.info + cc.infos[t.nodeID] = t.info } } -func (w *WAL) addNode(ctx context.Context, nodeID vanus.ID) error { - ch := make(chan error) - task := adminTask{ - nodeID: nodeID, - ch: ch, - } - - w.closeMu.RLock() - select { - case <-w.closeC: - default: - } +var emptyCompact = make([]byte, 16) - select { - case w.compactC <- task.addNode: - case <-ctx.Done(): - w.closeMu.RUnlock() - return ctx.Err() - } - w.closeMu.RUnlock() +func (w *WAL) addNode(ctx context.Context, nodeID vanus.ID) error { + return w.invokeCompactTask(ctx, func(w *WAL, _ *compactContext, ch chan<- error) { + w.nodes[nodeID] = true - select { - case err := <-ch: - return err - case <-ctx.Done(): - return ctx.Err() - } + key := []byte(CompactKey(nodeID.Uint64())) + w.stateStore.Store(context.Background(), key, emptyCompact, func(err error) { + if err != nil { + ch <- err + } + close(ch) + }) + }) } func (w *WAL) removeNode(ctx context.Context, nodeID vanus.ID) error { - ch := make(chan error) - task := adminTask{ - nodeID: nodeID, - ch: ch, - } - - w.closeMu.RLock() - select { - case <-w.closeC: - default: - } + return w.invokeCompactTask(ctx, func(w *WAL, cc *compactContext, ch chan<- error) { + // Prevent compact on node. + w.nodes[nodeID] = false + delete(cc.infos, nodeID) + + w.stateStore.Delete(context.Background(), []byte(CompactKey(nodeID.Uint64())), func(err error) { + if err != nil { + // TODO(james.yin): handle error. + panic(err) + } - select { - case w.compactC <- task.removeNode: - case <-ctx.Done(): - w.closeMu.RUnlock() - return ctx.Err() - } - w.closeMu.RUnlock() + close(ch) - select { - case err := <-ch: - return err - case <-ctx.Done(): - return ctx.Err() - } + // Clean node to delete WAL. + _ = w.dispatchCompactTask(context.TODO(), func(w *WAL, cc *compactContext) { + delete(w.nodes, nodeID) + }) + }) + }) } func (w *WAL) recoverNode(nodeID vanus.ID, offset int64) { @@ -262,45 +200,6 @@ func (w *WAL) recoverNode(nodeID vanus.ID, offset int64) { } } -type adminTask struct { - nodeID vanus.ID - ch chan<- error -} - -var emptyCompact = make([]byte, 16) - -func (t *adminTask) addNode(w *WAL, _ *compactContext) { - w.nodes[t.nodeID] = true - - key := []byte(CompactKey(t.nodeID.Uint64())) - w.stateStore.Store(context.Background(), key, emptyCompact, func(err error) { - if err != nil { - t.ch <- err - } - close(t.ch) - }) -} - -func (t *adminTask) removeNode(w *WAL, cCtx *compactContext) { - // Prevent compact on node. - w.nodes[t.nodeID] = false - delete(cCtx.infos, t.nodeID) - - w.stateStore.Delete(context.Background(), []byte(CompactKey(t.nodeID.Uint64())), func(err error) { - if err != nil { - // TODO(james.yin): handle error. - panic(err) - } - - close(t.ch) - - // Clean node to delete WAL. - w.compactC <- (func(w *WAL, cc *compactContext) { - delete(w.nodes, t.nodeID) - }) - }) -} - type compactInfo struct { index, term uint64 } @@ -311,6 +210,7 @@ func (ci *compactInfo) empty() bool { type logCompactInfos map[vanus.ID]compactInfo +// Make sure logCompactInfos implements meta.Ranger. var _ meta.Ranger = (logCompactInfos)(nil) func (i logCompactInfos) Range(cb meta.RangeCallback) error { @@ -331,6 +231,7 @@ type compactMeta struct { offset int64 } +// Make sure compactMeta implements meta.Ranger. var _ meta.Ranger = (*compactMeta)(nil) func (m *compactMeta) Range(cb meta.RangeCallback) error { @@ -381,61 +282,131 @@ func (c *compactContext) sync(ctx context.Context, stateStore *meta.SyncStore) b return true } +type compactFunc func(*WAL, *compactContext) + +type compactJob struct { + fn compactFunc +} + +func (j *compactJob) invoke(w *WAL, cc *compactContext) { + j.fn(w, cc) +} + +// runCompact processes all compact jobs in a single goroutine. func (w *WAL) runCompact() { ctx := context.Background() ticker := time.NewTicker(defaultCompactInterval) defer ticker.Stop() - cCtx := loadCompactContext(w.stateStore) + cc := loadCompactContext(w.stateStore) for { select { - case task := <-w.compactC: - task(w, cCtx) + case job := <-w.compactC: + job.invoke(w, cc) case <-ticker.C: - w.doCompact(ctx, cCtx) + w.doCompact(ctx, cc) case <-w.closeC: close(w.compactC) - for task := range w.compactC { - task(w, cCtx) + for job := range w.compactC { + job.invoke(w, cc) } - w.doCompact(ctx, cCtx) + w.doCompact(ctx, cc) close(w.doneC) return } } } -func (w *WAL) doCompact(ctx context.Context, cCtx *compactContext) { +func (w *WAL) doCompact(ctx context.Context, cc *compactContext) { + w.reconcileBarrier(cc) + + if cc.stale() { + log.Debug(ctx). + Int64("offset", cc.toCompact). + Msg("compact WAL of raft storage.") + + // Store compacted info and offset. + if cc.sync(ctx, w.stateStore) { + // Compact underlying WAL. + _ = w.WAL.Compact(ctx, cc.compacted) + } + } +} + +// reconcileBarrier scans barriers and calculates compactContext.toCompact. +func (w *WAL) reconcileBarrier(cc *compactContext) { for { front := w.barrier.Front() // No log entry in WAL. if front == nil { - cCtx.toCompact = cCtx.tail - break + cc.toCompact = cc.tail + return } + // Remove barrier if node is deleted. if _, ok := w.nodes[front.Value.(vanus.ID)]; !ok { w.barrier.RemoveElement(front) continue } offset, _ := front.Key().(int64) - cCtx.toCompact = offset - break + cc.toCompact = offset + return } +} - if cCtx.stale() { - log.Debug(ctx). - Int64("offset", cCtx.toCompact). - Msg("compact WAL of raft storage.") +// dispatchCompactJob dispatches a compact job to the compact goroutine. +func (w *WAL) dispatchCompactJob(ctx context.Context, job compactJob) error { + // NOTE: no panic, avoid unlocking with defer. + w.closeMu.RLock() - // Store compacted info and offset. - if cCtx.sync(ctx, w.stateStore) { - // Compact underlying WAL. - _ = w.WAL.Compact(ctx, cCtx.compacted) - } + select { + case <-w.closeC: + w.closeMu.RUnlock() + return ErrClosed + default: + } + + select { + case w.compactC <- job: + w.closeMu.RUnlock() + return nil + case <-ctx.Done(): + w.closeMu.RUnlock() + return ctx.Err() + } +} + +// dispatchCompactTask dispatches a compact task to the compact goroutine. +func (w *WAL) dispatchCompactTask(ctx context.Context, task compactFunc) error { + job := compactJob{ + fn: task, + } + return w.dispatchCompactJob(ctx, job) +} + +type awaitableCompactFunc func(*WAL, *compactContext, chan<- error) + +// invokeCompactTask invokes a compact task and waits for its completion. +func (w *WAL) invokeCompactTask(ctx context.Context, task awaitableCompactFunc) error { + ch := make(chan error) + job := compactJob{ + fn: func(w *WAL, cc *compactContext) { + task(w, cc, ch) + }, + } + + if err := w.dispatchCompactJob(ctx, job); err != nil { + return err + } + + select { + case err := <-ch: + return err + case <-ctx.Done(): + return ctx.Err() } } diff --git a/server/store/raft/storage/log_storage.go b/server/store/raft/storage/log_storage.go index fcc5487a9..842b043b9 100644 --- a/server/store/raft/storage/log_storage.go +++ b/server/store/raft/storage/log_storage.go @@ -38,6 +38,7 @@ type logStorage struct { // ents[0] is a dummy entry, which record compact information. // ents[i] has raft log position i+snapshot.Metadata.Index. ents []raftpb.Entry + // offs[0] is a dummy entry, which records last offset where the barrier was set. // offs[i] is the start offset of ents[i] in WAL. offs []int64 @@ -193,16 +194,13 @@ func (s *Storage) Append(ctx context.Context, entries []raftpb.Entry, cb AppendC return } - term := entries[0].Term - index := entries[0].Index - rindex := index + uint64(len(entries)) - 1 // entries[len(entries)-1].Index + index, term := entries[0].Index, entries[0].Term + rightIndex := index + uint64(len(entries)) - 1 // entries[len(entries)-1].Index - firstIndex := s.firstIndex() - lastTerm := s.lastTerm() - lastIndex := s.lastIndex() + firstIndex, lastIndex, lastTerm := s.firstIndex(), s.lastIndex(), s.lastTerm() expectedIndex := lastIndex + 1 - if expectedIndex < index { + logError := func(msg string) { log.Error(ctx). Stringer("node_id", s.nodeID). Uint64("first_index", firstIndex). @@ -210,13 +208,17 @@ func (s *Storage) Append(ctx context.Context, entries []raftpb.Entry, cb AppendC Uint64("last_index", lastIndex). Uint64("next_term", term). Uint64("next_index", index). - Msg("Missing log entries.") + Msg(msg) + } + + if expectedIndex < index { + logError("Missing log entries.") cb(AppendResult{}, ErrBadEntry) return } // Shortcut if there is no new entry. - if rindex < firstIndex { + if rightIndex < firstIndex { cb(AppendResult{}, ErrCompacted) return } @@ -224,19 +226,11 @@ func (s *Storage) Append(ctx context.Context, entries []raftpb.Entry, cb AppendC // Truncate compacted entries. if index < firstIndex { entries = entries[firstIndex-index:] - term = entries[0].Term - index = entries[0].Index + index, term = entries[0].Index, entries[0].Term } if term < lastTerm { - log.Error(ctx). - Stringer("node_id", s.nodeID). - Uint64("first_index", firstIndex). - Uint64("last_term", lastTerm). - Uint64("last_index", lastIndex). - Uint64("next_term", term). - Uint64("next_index", index). - Msg("Term roll back.") + logError("Term roll back.") cb(AppendResult{}, ErrBadEntry) return } @@ -296,16 +290,10 @@ func (s *Storage) Append(ctx context.Context, entries []raftpb.Entry, cb AppendC } func (s *Storage) postAppend(entries []raftpb.Entry, offsets []int64, tail int64, remark bool, cb AppendCallback) { - end := len(entries) - 1 - for ; end >= 0; end-- { - e := &entries[end] - gt, err := s.term(e.Index) - if err == nil && gt == e.Term { - break - } - } + entries = s.truncateObsoleteEntries(entries) - if end < 0 { + n := len(entries) + if n == 0 { if remark { // Remove obsolete barrier from truncated entry. _ = s.wal.removeBarrier(context.TODO(), s.nodeID, offsets[0]) @@ -321,13 +309,14 @@ func (s *Storage) postAppend(entries []raftpb.Entry, offsets []int64, tail int64 if index == li+1 { // append - s.offs = append(s.offs, offsets[:end+1]...) + s.offs = append(s.offs, offsets[:n]...) } else { // truncate then append: term > lastTerm after := index - fi + 1 s.offs = append([]int64{}, s.offs[:after]...) - s.offs = append(s.offs, offsets[:end+1]...) + s.offs = append(s.offs, offsets[:n]...) } + // FIXME(james.yin): real? s.tail = tail if remark { @@ -336,17 +325,28 @@ func (s *Storage) postAppend(entries []raftpb.Entry, offsets []int64, tail int64 _ = s.wal.removeBarrier(context.TODO(), s.nodeID, s.offs[0]) } - // Record barrier. + // Record new barrier. s.offs[0] = s.offs[1] } - e := &entries[end] + e := &entries[n-1] cb(AppendResult{ Term: e.Term, Index: e.Index, }, nil) } +func (s *Storage) truncateObsoleteEntries(entries []raftpb.Entry) []raftpb.Entry { + for end := len(entries) - 1; end >= 0; end-- { + e := &entries[end] + term, err := s.term(e.Index) + if err == nil && term == e.Term { + return entries[:end+1] + } + } + return entries[:0] +} + func (s *Storage) prepareAppend(ctx context.Context, entries []raftpb.Entry) error { for i := 1; i < len(entries); i++ { entry, prev := &entries[i], &entries[i-1] diff --git a/server/store/raft/storage/recovery.go b/server/store/raft/storage/recovery.go index 317892898..9b2877f4a 100644 --- a/server/store/raft/storage/recovery.go +++ b/server/store/raft/storage/recovery.go @@ -49,6 +49,7 @@ const ( ) func (sb *storagesBuilder) onMeta(key []byte, value interface{}) error { + // Filter compact key. if len(key) < minCompactKeyLen || !bytes.Equal(key[len(key)-compactSuffixLen:], compactSuffix) { return nil } @@ -145,12 +146,6 @@ func Recover( return storages, wal2, nil } -func RecoverStorage( - nodeID vanus.ID, wal *WAL, stateStore *meta.SyncStore, hintStore *meta.AsyncStore, snapOp SnapshotOperator, -) (*Storage, error) { - return recoverStorage(nodeID, wal, stateStore, hintStore, nil, snapOp) -} - func recoverStorage( nodeID vanus.ID, wal *WAL, stateStore *meta.SyncStore, hintStore *meta.AsyncStore, info []byte, snapOp SnapshotOperator, diff --git a/server/store/raft/storage/storage.go b/server/store/raft/storage/storage.go index 301774cef..fad2cc017 100644 --- a/server/store/raft/storage/storage.go +++ b/server/store/raft/storage/storage.go @@ -30,9 +30,7 @@ import ( ) type Storage struct { - // Protects access to all fields. Most methods of Log are - // run on the raft goroutine, but Append() and Compact() is run on an - // application goroutine. + // Protects access to raft states. mu sync.RWMutex nodeID vanus.ID @@ -41,11 +39,11 @@ type Storage struct { logStorage snapshotStorage - // AppendExecutor is the Executor that executes Append, postAppend and Compact. + // AppendExecutor is the Executor that executes Append, postAppend, and Compact. AppendExecutor executor.Executor } -// Make sure Log implements raft.Storage. +// Make sure Storage implements raft.Storage. var _ raft.Storage = (*Storage)(nil) // NewStorage creates an empty Storage. @@ -85,7 +83,7 @@ func newStorage( return s } -// Delete discard all data of Storage. +// Delete discards all data of Storage. // NOTE: waiting for inflight append calls is the responsibility of the caller. func (s *Storage) Delete(ctx context.Context) { if err := s.wal.removeNode(ctx, s.nodeID); err != nil { diff --git a/server/store/raft/storage/wal.go b/server/store/raft/storage/wal.go index 9565a2a34..7c9091c11 100644 --- a/server/store/raft/storage/wal.go +++ b/server/store/raft/storage/wal.go @@ -30,21 +30,25 @@ import ( ) const ( - defaultCompactTaskBufferSize = 256 + defaultCompactJobBufferSize = 256 ) +// WAL wraps underlay walog.WAL and provisions compacting capability. +// All compact tasks be processed in WAL.runCompact by a single goroutine. type WAL struct { *walog.WAL stateStore *meta.SyncStore - nodes map[vanus.ID]bool - barrier *skiplist.SkipList - compactC chan func(*WAL, *compactContext) + nodes map[vanus.ID]bool + barrier *skiplist.SkipList + compactC chan compactJob + // closeMu prevents concurrent writing compactC and closing closeC. closeMu sync.RWMutex - closeC chan struct{} - doneC chan struct{} + // closeC prevents writing to compactC after close. + closeC chan struct{} + doneC chan struct{} } func newWAL(wal *walog.WAL, stateStore *meta.SyncStore, startCompaction bool) *WAL { @@ -53,7 +57,7 @@ func newWAL(wal *walog.WAL, stateStore *meta.SyncStore, startCompaction bool) *W stateStore: stateStore, nodes: make(map[vanus.ID]bool), barrier: skiplist.New(skiplist.Int64), - compactC: make(chan func(*WAL, *compactContext), defaultCompactTaskBufferSize), + compactC: make(chan compactJob, defaultCompactJobBufferSize), closeC: make(chan struct{}), doneC: make(chan struct{}), } @@ -71,12 +75,13 @@ func (w *WAL) startCompaction() { func (w *WAL) Close() { w.WAL.Close() + go func() { w.WAL.Wait() w.closeMu.Lock() - defer w.closeMu.Unlock() close(w.closeC) + w.closeMu.Unlock() }() } diff --git a/server/store/wal/wal.go b/server/store/wal/wal.go index 9b688dfa9..e25873a37 100644 --- a/server/store/wal/wal.go +++ b/server/store/wal/wal.go @@ -46,6 +46,7 @@ type AppendOneCallback = func(Range, error) type AppendCallback = func([]Range, error) // WAL is write-ahead log. +// All append tasks be processed in WAL.runAppend by a single goroutine. type WAL struct { sf *segmentedfile.SegmentedFile s stream.Stream @@ -176,7 +177,7 @@ func (w *WAL) runAppend() { w.appendQ.Wait() - // Invoke remaind tasks in w.appendQ. + // Invoke remained tasks in w.appendQ. for { task, ok := w.appendQ.RawPop() if !ok {