Skip to content

Commit

Permalink
refactor: storage (#634)
Browse files Browse the repository at this point in the history
* feat(store): save copy of bytes in meta store
* fix: return false when push task to MultiFlow fails
* refactor(store): raft storage

Signed-off-by: James Yin <[email protected]>
  • Loading branch information
ifplusor authored Oct 8, 2023
1 parent c9aff3e commit 8cdd729
Show file tree
Hide file tree
Showing 13 changed files with 243 additions and 236 deletions.
2 changes: 1 addition & 1 deletion lib/container/conque/blocking/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (q *Queue[T]) Close() {
q.sem.Release()
}

// Wait esures that all incoming Pushes observe that the queue is closed.
// Wait ensures that all incoming Pushes observe that the queue is closed.
func (q *Queue[T]) Wait() {
// Make sure no inflight Push.
q.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion lib/executor/multi_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (f *flow) Execute(t Task) bool {
}

if f.q.Push(t) {
f.mf.q.Push(f)
return f.mf.q.Push(f)
}
return true
}
Expand Down
8 changes: 1 addition & 7 deletions server/store/meta/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *AsyncStore) set(kvs Ranger) error {
}

err := kvs.Range(func(key []byte, value interface{}) error {
s.pending.Set(key, value)
set(s.pending, key, value)
return nil
})
if err != nil {
Expand Down Expand Up @@ -190,12 +190,6 @@ func (s *AsyncStore) commit() {
s.version = r.EO
}

func merge(dst, src *skiplist.SkipList) {
for el := src.Front(); el != nil; el = el.Next() {
set(dst, el.Key().([]byte), el.Value)
}
}

func RecoverAsyncStore(ctx context.Context, dir string, opts ...walog.Option) (*AsyncStore, error) {
committed, snapshot, err := recoverLatestSnapshot(ctx, dir, defaultCodec)
if err != nil {
Expand Down
8 changes: 0 additions & 8 deletions server/store/meta/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,3 @@ func (s *store) load(key []byte) (interface{}, bool) {
// func (s *store) delete(key []byte) {
// set(s.committed, key, deletedMark)
// }

func set(m *skiplist.SkipList, key []byte, value interface{}) {
if value == DeletedMark {
m.Remove(key)
} else {
m.Set(key, value)
}
}
8 changes: 2 additions & 6 deletions server/store/meta/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,7 @@ func (s *SyncStore) set(ctx context.Context, kvs Ranger, cb StoreCallback) {
// Update state.
s.mu.Lock()
_ = kvs.Range(func(key []byte, value interface{}) error {
if value == DeletedMark {
s.committed.Remove(key)
} else {
s.committed.Set(key, value)
}
update(s.committed, key, value)
return nil
})
s.version = r.EO
Expand Down Expand Up @@ -174,7 +170,7 @@ func RecoverSyncStore(ctx context.Context, dir string, opts ...walog.Option) (*S
walog.FromPosition(snapshot),
walog.WithRecoveryCallback(func(data []byte, r walog.Range) error {
err2 := defaultCodec.Unmarshal(data, func(key []byte, value interface{}) error {
set(committed, key, value)
rawUpdate(committed, key, value)
return nil
})
if err2 != nil {
Expand Down
54 changes: 54 additions & 0 deletions server/store/meta/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2022 Linkall Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package meta

import (
// third-party libraries.
"github.com/huandu/skiplist"
)

func update(m *skiplist.SkipList, key []byte, value interface{}) {
if value == DeletedMark {
m.Remove(key)
return
}

set(m, key, value)
}

func set(m *skiplist.SkipList, key []byte, value interface{}) {
switch val := value.(type) {
case []byte:
// Make a copy to avoid modifying value outside.
bs := append([]byte{}, val...)
m.Set(key, bs)
default:
m.Set(key, value)
}
}

func rawUpdate(m *skiplist.SkipList, key []byte, value interface{}) {
if value == DeletedMark {
m.Remove(key)
} else {
m.Set(key, value)
}
}

func merge(dst, src *skiplist.SkipList) {
for el := src.Front(); el != nil; el = el.Next() {
rawUpdate(dst, el.Key().([]byte), el.Value)
}
}
1 change: 1 addition & 0 deletions server/store/raft/block/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
// third-party libraries.
"google.golang.org/grpc"

// first-party libraries.
raftpb "github.com/vanus-labs/vanus/api/raft"
vanus "github.com/vanus-labs/vanus/api/vsr"
"github.com/vanus-labs/vanus/pkg/raft"
Expand Down
Loading

0 comments on commit 8cdd729

Please sign in to comment.