Skip to content

Commit

Permalink
Improve raft write performance by utilizing FSM Batching (#7527)
Browse files Browse the repository at this point in the history
* Start benchmark work

* Add batching FSM function

* dedupe some code

* Update dependency on chunking FSM

* fix raft external tests

* fix go.mod

* Add batching test

* uncomment test

* update raft deps

* update vendor

* Update physical/raft/fsm.go

Co-Authored-By: Michel Vocks <[email protected]>

* Update physical/raft/fsm.go
  • Loading branch information
briankassouf authored Oct 14, 2019
1 parent 29cdb19 commit 457df23
Show file tree
Hide file tree
Showing 30 changed files with 900 additions and 400 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ require (
github.com/hashicorp/go-memdb v1.0.2
github.com/hashicorp/go-msgpack v0.5.5
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/go-raftchunking v0.6.2
github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a
github.com/hashicorp/go-rootcerts v1.0.1
github.com/hashicorp/go-sockaddr v1.0.2
github.com/hashicorp/go-syslog v1.0.0
github.com/hashicorp/go-uuid v1.0.1
github.com/hashicorp/golang-lru v0.5.3
github.com/hashicorp/hcl v1.0.0
github.com/hashicorp/nomad/api v0.0.0-20190412184103-1c38ced33adf
github.com/hashicorp/raft v1.1.1
github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17
github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab
github.com/hashicorp/vault-plugin-auth-alicloud v0.5.2-0.20190814210027-93970f08f2ec
github.com/hashicorp/vault-plugin-auth-azure v0.5.2-0.20190814210035-08e00d801115
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYE
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/boombuler/barcode v1.0.0 h1:s1TvRnXwL2xJRaccrdcBQMZxq6X7DvsMogtmJeHDdrc=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
Expand Down Expand Up @@ -291,6 +292,8 @@ github.com/hashicorp/go-plugin v1.0.1 h1:4OtAfUGbnKC6yS48p0CtMX2oFYtzFZVv6rok3cR
github.com/hashicorp/go-plugin v1.0.1/go.mod h1:++UyYGoz3o5w9ZzAdZxtQKrWWP+iqPBn3cQptSMzBuY=
github.com/hashicorp/go-raftchunking v0.6.2 h1:imj6CVkwXj6VzgXZQvzS+fSrkbFCzlJ2t00F3PacnuU=
github.com/hashicorp/go-raftchunking v0.6.2/go.mod h1:cGlg3JtDy7qy6c/3Bu660Mic1JF+7lWqIwCFSb08fX0=
github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a h1:FmnBDwGwlTgugDGbVxwV8UavqSMACbGrUpfc98yFLR4=
github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a/go.mod h1:xbXnmKqX9/+RhPkJ4zrEx4738HacP72aaUPlT2RZ4sU=
github.com/hashicorp/go-retryablehttp v0.5.3 h1:QlWt0KvWT0lq8MFppF9tsJGF+ynG7ztc2KIPhzRGk7s=
github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
github.com/hashicorp/go-retryablehttp v0.5.4 h1:1BZvpawXoJCWX6pNtow9+rpEj+3itIlutiqnntI6jOE=
Expand Down Expand Up @@ -330,6 +333,8 @@ github.com/hashicorp/nomad/api v0.0.0-20190412184103-1c38ced33adf/go.mod h1:BDng
github.com/hashicorp/raft v1.0.1/go.mod h1:DVSAWItjLjTOkVbSpWQ0j0kUADIvDaCtBxIcbNAQLkI=
github.com/hashicorp/raft v1.1.1 h1:HJr7UE1x/JrJSc9Oy6aDBHtNHUUBHjcQjTgvUVihoZs=
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17 h1:p+2EISNdFCnD9R+B4xCiqSn429MCFtvM41aHJDJ6qW4=
github.com/hashicorp/raft v1.1.2-0.20191002163536-9c6bd3e3eb17/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab h1:WzGMwlO1DvaC93SvVOBOKtn+nXGEDXapyJuaRV3/VaY=
github.com/hashicorp/raft-snapshot v1.0.2-0.20190827162939-8117efcc5aab/go.mod h1:5sL9eUn72lH5DzsFIJ9jaysITbHksSSszImWSOTC8Ic=
Expand Down
216 changes: 99 additions & 117 deletions physical/raft/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
var _ physical.Backend = (*FSM)(nil)
var _ physical.Transactional = (*FSM)(nil)
var _ raft.FSM = (*FSM)(nil)
var _ raft.ConfigurationStore = (*FSM)(nil)
var _ raft.BatchingFSM = (*FSM)(nil)

type restoreCallback func(context.Context) error

Expand All @@ -75,7 +75,6 @@ type FSM struct {
l sync.RWMutex
path string
logger log.Logger
permitPool *physical.PermitPool
noopRestore bool

db *bolt.DB
Expand All @@ -88,7 +87,7 @@ type FSM struct {
// additional state in the backend.
storeLatestState bool

chunker *raftchunking.ChunkingConfigurationStore
chunker *raftchunking.ChunkingBatchingFSM
}

// NewFSM constructs a FSM using the given directory
Expand Down Expand Up @@ -159,9 +158,8 @@ func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) {
}

f := &FSM{
path: conf["path"],
logger: logger,
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
path: conf["path"],
logger: logger,

db: boltDB,
latestTerm: latestTerm,
Expand All @@ -170,7 +168,7 @@ func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) {
storeLatestState: storeLatestState,
}

f.chunker = raftchunking.NewChunkingConfigurationStore(f, &FSMChunkStorage{
f.chunker = raftchunking.NewChunkingBatchingFSM(f, &FSMChunkStorage{
f: f,
ctx: context.Background(),
})
Expand Down Expand Up @@ -245,9 +243,6 @@ func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configurat
func (f *FSM) Delete(ctx context.Context, path string) error {
defer metrics.MeasureSince([]string{"raft", "delete"}, time.Now())

f.permitPool.Acquire()
defer f.permitPool.Release()

f.l.RLock()
defer f.l.RUnlock()

Expand All @@ -260,9 +255,6 @@ func (f *FSM) Delete(ctx context.Context, path string) error {
func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error {
defer metrics.MeasureSince([]string{"raft", "delete_prefix"}, time.Now())

f.permitPool.Acquire()
defer f.permitPool.Release()

f.l.RLock()
defer f.l.RUnlock()

Expand All @@ -287,9 +279,6 @@ func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error {
func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {
defer metrics.MeasureSince([]string{"raft", "get"}, time.Now())

f.permitPool.Acquire()
defer f.permitPool.Release()

f.l.RLock()
defer f.l.RUnlock()

Expand Down Expand Up @@ -324,9 +313,6 @@ func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {
func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {
defer metrics.MeasureSince([]string{"raft", "put"}, time.Now())

f.permitPool.Acquire()
defer f.permitPool.Release()

f.l.RLock()
defer f.l.RUnlock()

Expand All @@ -340,9 +326,6 @@ func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {
func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) {
defer metrics.MeasureSince([]string{"raft", "list"}, time.Now())

f.permitPool.Acquire()
defer f.permitPool.Release()

f.l.RLock()
defer f.l.RUnlock()

Expand Down Expand Up @@ -374,9 +357,6 @@ func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) {
// Transaction writes all the operations in the provided transaction to the bolt
// file.
func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
f.permitPool.Acquire()
defer f.permitPool.Release()

f.l.RLock()
defer f.l.RUnlock()

Expand Down Expand Up @@ -404,57 +384,98 @@ func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error
return err
}

// Apply will apply a log value to the FSM. This is called from the raft
// ApplyBatch will apply a set of logs to the FSM. This is called from the raft
// library.
func (f *FSM) Apply(log *raft.Log) interface{} {
command := &LogData{}
err := proto.Unmarshal(log.Data, command)
if err != nil {
f.logger.Error("error proto unmarshaling log data", "error", err)
panic("error proto unmarshaling log data")
}
func (f *FSM) ApplyBatch(logs []*raft.Log) []interface{} {
if len(logs) == 0 {
return []interface{}{}
}

// Do the unmarshalling first so we don't hold locks
var latestConfiguration *ConfigurationValue
commands := make([]interface{}, 0, len(logs))
for _, log := range logs {
switch log.Type {
case raft.LogCommand:
command := &LogData{}
err := proto.Unmarshal(log.Data, command)
if err != nil {
f.logger.Error("error proto unmarshaling log data", "error", err)
panic("error proto unmarshaling log data")
}
commands = append(commands, command)
case raft.LogConfiguration:
configuration := raft.DecodeConfiguration(log.Data)
config := raftConfigurationToProtoConfiguration(log.Index, configuration)

f.l.RLock()
defer f.l.RUnlock()
commands = append(commands, config)

// Update the latest configuration the fsm has received; we will
// store this after it has been committed to storage.
latestConfiguration = config

default:
panic(fmt.Sprintf("got unexpected log type: %d", log.Type))
}
}

// Only advance latest pointer if this log has a higher index value than
// what we have seen in the past.
var logIndex []byte
var err error
latestIndex, _ := f.LatestState()
if latestIndex.Index < log.Index {
lastLog := logs[len(logs)-1]
if latestIndex.Index < lastLog.Index {
logIndex, err = proto.Marshal(&IndexValue{
Term: log.Term,
Index: log.Index,
Term: lastLog.Term,
Index: lastLog.Index,
})
if err != nil {
f.logger.Error("unable to marshal latest index", "error", err)
panic("unable to marshal latest index")
}
}

f.l.RLock()
defer f.l.RUnlock()

err = f.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(dataBucketName)
for _, op := range command.Operations {
var err error
switch op.OpType {
case putOp:
err = b.Put([]byte(op.Key), op.Value)
case deleteOp:
err = b.Delete([]byte(op.Key))
case restoreCallbackOp:
if f.restoreCb != nil {
// Kick off the restore callback function in a go routine
go f.restoreCb(context.Background())
for _, commandRaw := range commands {
switch command := commandRaw.(type) {
case *LogData:
for _, op := range command.Operations {
var err error
switch op.OpType {
case putOp:
err = b.Put([]byte(op.Key), op.Value)
case deleteOp:
err = b.Delete([]byte(op.Key))
case restoreCallbackOp:
if f.restoreCb != nil {
// Kick off the restore callback function in a go routine
go f.restoreCb(context.Background())
}
default:
return fmt.Errorf("%q is not a supported transaction operation", op.OpType)
}
if err != nil {
return err
}
}

case *ConfigurationValue:
b := tx.Bucket(configBucketName)
configBytes, err := proto.Marshal(command)
if err != nil {
return err
}
if err := b.Put(latestConfigKey, configBytes); err != nil {
return err
}
default:
return fmt.Errorf("%q is not a supported transaction operation", op.OpType)
}
if err != nil {
return err
}
}

// TODO: benchmark so we can know how much time this adds
if f.storeLatestState && len(logIndex) > 0 {
b := tx.Bucket(configBucketName)
err = b.Put(latestIndexKey, logIndex)
Expand All @@ -472,13 +493,32 @@ func (f *FSM) Apply(log *raft.Log) interface{} {

// If we advanced the latest value, update the in-memory representation too.
if len(logIndex) > 0 {
atomic.StoreUint64(f.latestTerm, log.Term)
atomic.StoreUint64(f.latestIndex, log.Index)
atomic.StoreUint64(f.latestTerm, lastLog.Term)
atomic.StoreUint64(f.latestIndex, lastLog.Index)
}

// If one or more configuration changes were processed, store the latest one.
if latestConfiguration != nil {
f.latestConfig.Store(latestConfiguration)
}

return &FSMApplyResponse{
Success: true,
// Build the responses. The logs array is used here to ensure we reply to
// all command values; even if they are not of the types we expect. This
// should future proof this function from more log types being provided.
resp := make([]interface{}, len(logs))
for i := range logs {
resp[i] = &FSMApplyResponse{
Success: true,
}
}

return resp
}

// Apply will apply a log value to the FSM. This is called from the raft
// library.
func (f *FSM) Apply(log *raft.Log) interface{} {
return f.ApplyBatch([]*raft.Log{log})[0]
}

type writeErrorCloser interface {
Expand Down Expand Up @@ -609,61 +649,6 @@ func (s *noopSnapshotter) Persist(sink raft.SnapshotSink) error {
// Release doesn't do anything.
func (s *noopSnapshotter) Release() {}

// StoreConfig satisfies the raft.ConfigurationStore interface and persists the
// latest raft server configuration to the bolt file.
func (f *FSM) StoreConfiguration(index uint64, configuration raft.Configuration) {
f.l.RLock()
defer f.l.RUnlock()

var indexBytes []byte
latestIndex, _ := f.LatestState()
// Only write the new index if we are advancing the pointer
if index > latestIndex.Index {
latestIndex.Index = index

var err error
indexBytes, err = proto.Marshal(latestIndex)
if err != nil {
f.logger.Error("unable to marshal latest index", "error", err)
panic(fmt.Sprintf("unable to marshal latest index: %v", err))
}
}

protoConfig := raftConfigurationToProtoConfiguration(index, configuration)
configBytes, err := proto.Marshal(protoConfig)
if err != nil {
f.logger.Error("unable to marshal config", "error", err)
panic(fmt.Sprintf("unable to marshal config: %v", err))
}

if f.storeLatestState {
err = f.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(configBucketName)
err := b.Put(latestConfigKey, configBytes)
if err != nil {
return err
}

// TODO: benchmark so we can know how much time this adds
if len(indexBytes) > 0 {
err = b.Put(latestIndexKey, indexBytes)
if err != nil {
return err
}
}

return nil
})
if err != nil {
f.logger.Error("unable to store latest configuration", "error", err)
panic(fmt.Sprintf("unable to store latest configuration: %v", err))
}
}

f.witnessIndex(latestIndex)
f.latestConfig.Store(protoConfig)
}

// raftConfigurationToProtoConfiguration converts a raft configuration object to
// a proto value.
func raftConfigurationToProtoConfiguration(index uint64, configuration raft.Configuration) *ConfigurationValue {
Expand Down Expand Up @@ -722,9 +707,6 @@ func (f *FSMChunkStorage) StoreChunk(chunk *raftchunking.ChunkInfo) (bool, error
Value: b,
}

f.f.permitPool.Acquire()
defer f.f.permitPool.Release()

f.f.l.RLock()
defer f.f.l.RUnlock()

Expand Down
Loading

0 comments on commit 457df23

Please sign in to comment.