Skip to content

Commit

Permalink
Add tests, refactor storage, optimize start up time for raft
Browse files Browse the repository at this point in the history
  • Loading branch information
bubbajoe committed Jun 19, 2024
1 parent 54231db commit 6df96f2
Show file tree
Hide file tree
Showing 14 changed files with 270 additions and 211 deletions.
11 changes: 7 additions & 4 deletions internal/admin/admin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ import (
func StartAdminAPI(
version string, conf *config.DGateConfig,
logger *zap.Logger, cs changestate.ChangeState,
) {
) error {

Check warning on line 25 in internal/admin/admin_api.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_api.go#L25

Added line #L25 was not covered by tests
if conf.AdminConfig == nil {
logger.Warn("Admin API is disabled")
return
return nil

Check warning on line 28 in internal/admin/admin_api.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_api.go#L28

Added line #L28 was not covered by tests
}

mux := chi.NewRouter()
configureRoutes(mux, version,
logger.Named("routes"), cs, conf)
if err := configureRoutes(mux, version, logger.Named("routes"), cs, conf); err != nil {
return err

Check warning on line 33 in internal/admin/admin_api.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_api.go#L32-L33

Added lines #L32 - L33 were not covered by tests
}

// Start HTTP Server
go func() {
Expand All @@ -44,6 +45,7 @@ func StartAdminAPI(
ErrorLog: zap.NewStdLog(adminHttpLogger),
}
if err := server.ListenAndServe(); err != nil {
logger.Error("Error starting admin api", zap.Error(err))

Check warning on line 48 in internal/admin/admin_api.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_api.go#L48

Added line #L48 was not covered by tests
panic(err)
}
}()
Expand Down Expand Up @@ -143,4 +145,5 @@ func StartAdminAPI(
}()
}
}
return nil

Check warning on line 148 in internal/admin/admin_api.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_api.go#L148

Added line #L148 was not covered by tests
}
103 changes: 77 additions & 26 deletions internal/admin/admin_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,40 @@ import (
"go.uber.org/zap"
)

type dgateAdminFSM struct {
cs changestate.ChangeState
logger *zap.Logger
type AdminFSM struct {
cs changestate.ChangeState
storage raft.StableStore
logger *zap.Logger

localState *saveState
}

var _ raft.BatchingFSM = (*dgateAdminFSM)(nil)
var _ raft.BatchingFSM = (*AdminFSM)(nil)

type saveState struct {
AppliedIndex uint64 `json:"aindex"`
}

func newDGateAdminFSM(logger *zap.Logger, cs changestate.ChangeState) *dgateAdminFSM {
return &dgateAdminFSM{cs, logger}
func newAdminFSM(
logger *zap.Logger,
storage raft.StableStore,
cs changestate.ChangeState,
) raft.FSM {
fsm := &AdminFSM{cs, storage, logger, &saveState{}}
stateBytes, err := storage.Get([]byte("prev_state"))
if err != nil {
logger.Error("error getting prev_state", zap.Error(err))
} else if len(stateBytes) != 0 {
if err = json.Unmarshal(stateBytes, &fsm.localState); err != nil {
logger.Warn("corrupted state detected", zap.ByteString("prev_state", stateBytes))
} else {
logger.Info("found state in store", zap.Any("prev_state", fsm.localState))

Check warning on line 41 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L32-L41

Added lines #L32 - L41 were not covered by tests
}
}
return fsm

Check warning on line 44 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L44

Added line #L44 was not covered by tests
}

func (fsm *dgateAdminFSM) applyLog(log *raft.Log, reload bool) (*spec.ChangeLog, error) {
func (fsm *AdminFSM) applyLog(log *raft.Log, reload bool) (*spec.ChangeLog, error) {

Check warning on line 47 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L47

Added line #L47 was not covered by tests
switch log.Type {
case raft.LogCommand:
var cl spec.ChangeLog
Expand Down Expand Up @@ -53,48 +75,77 @@ func (fsm *dgateAdminFSM) applyLog(log *raft.Log, reload bool) (*spec.ChangeLog,
return nil, nil
}

func (fsm *dgateAdminFSM) Apply(log *raft.Log) any {
rft := fsm.cs.Raft()
fsm.logger.Debug("apply single log",
zap.Uint64("applied", rft.AppliedIndex()),
zap.Uint64("commit", rft.CommitIndex()),
zap.Uint64("last", rft.LastIndex()),
zap.Uint64("logIndex", log.Index),
)
_, err := fsm.applyLog(log, true)
return err
func (fsm *AdminFSM) Apply(log *raft.Log) any {
resps := fsm.ApplyBatch([]*raft.Log{log})
if len(resps) != 1 {
panic("apply batch not returning the correct number of responses")

Check warning on line 81 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L78-L81

Added lines #L78 - L81 were not covered by tests
}
return resps[0]

Check warning on line 83 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L83

Added line #L83 was not covered by tests
}

func (fsm *dgateAdminFSM) ApplyBatch(logs []*raft.Log) []any {
func (fsm *AdminFSM) ApplyBatch(logs []*raft.Log) []any {

Check warning on line 86 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L86

Added line #L86 was not covered by tests
rft := fsm.cs.Raft()
lastIndex := len(logs) - 1
appliedIndex := rft.AppliedIndex()
lastLogIndex := logs[len(logs)-1].Index

Check warning on line 89 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L88-L89

Added lines #L88 - L89 were not covered by tests
fsm.logger.Debug("apply log batch",
zap.Uint64("applied", rft.AppliedIndex()),
zap.Uint64("applied", appliedIndex),

Check warning on line 91 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L91

Added line #L91 was not covered by tests
zap.Uint64("commit", rft.CommitIndex()),
zap.Uint64("last", rft.LastIndex()),
zap.Uint64("log[0]", logs[0].Index),
zap.Uint64("log[-1]", logs[lastIndex].Index),
zap.Uint64("log[-1]", lastLogIndex),

Check warning on line 95 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L95

Added line #L95 was not covered by tests
zap.Int("logs", len(logs)),
)

var err error

Check warning on line 99 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L99

Added line #L99 was not covered by tests
results := make([]any, len(logs))

for i, log := range logs {
// TODO: check to see if this can be optimized channels raft node provides
_, err := fsm.applyLog(log, lastIndex == i)
if err != nil {
isLast := len(logs)-1 == i
reload := fsm.shouldReload(log, isLast)
if _, err = fsm.applyLog(log, reload); err != nil {
fsm.logger.Error("Error applying log", zap.Error(err))
results[i] = err

Check warning on line 107 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L103-L107

Added lines #L103 - L107 were not covered by tests
}
}

if appliedIndex != 0 && lastLogIndex >= appliedIndex {
fsm.localState.AppliedIndex = lastLogIndex
if err = fsm.saveFSMState(); err != nil {
fsm.logger.Warn("failed to save applied index state",
zap.Uint64("applied_index", lastLogIndex),
)

Check warning on line 116 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L111-L116

Added lines #L111 - L116 were not covered by tests
}
fsm.cs.SetReady(true)

Check warning on line 118 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L118

Added line #L118 was not covered by tests
}

return results
}

func (fsm *dgateAdminFSM) Snapshot() (raft.FSMSnapshot, error) {
func (fsm *AdminFSM) saveFSMState() error {
fsm.logger.Debug("saving localState",
zap.Any("data", fsm.localState),
)
stateBytes, err := json.Marshal(fsm.localState)
if err != nil {
return err

Check warning on line 130 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L124-L130

Added lines #L124 - L130 were not covered by tests
}
return fsm.storage.Set([]byte("prev_state"), stateBytes)

Check warning on line 132 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L132

Added line #L132 was not covered by tests
}

func (fsm *AdminFSM) shouldReload(log *raft.Log, reload bool) bool {
if reload {
return log.Index >= fsm.localState.AppliedIndex

Check warning on line 137 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L135-L137

Added lines #L135 - L137 were not covered by tests
}
return false

Check warning on line 139 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L139

Added line #L139 was not covered by tests
}

func (fsm *AdminFSM) Snapshot() (raft.FSMSnapshot, error) {
fsm.cs = nil
fsm.logger.Warn("snapshots not supported")
return nil, errors.New("snapshots not supported")

Check warning on line 145 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L142-L145

Added lines #L142 - L145 were not covered by tests
}

func (fsm *dgateAdminFSM) Restore(rc io.ReadCloser) error {
func (fsm *AdminFSM) Restore(rc io.ReadCloser) error {
fsm.logger.Warn("snapshots not supported, cannot restore")
return nil

Check warning on line 150 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L148-L150

Added lines #L148 - L150 were not covered by tests
}
2 changes: 1 addition & 1 deletion internal/admin/admin_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func setupRaft(
adminConfig.Replication.AdvertScheme+"://(address)/raft",
)
fsmLogger := logger.Named("fsm")
adminFSM := newDGateAdminFSM(fsmLogger, cs)
adminFSM := newAdminFSM(fsmLogger, configStore, cs)

Check warning on line 104 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L104

Added line #L104 was not covered by tests
raftNode, err := raft.NewRaft(
raftConfig, adminFSM, logStore,
configStore, snapStore, transport,

Check warning on line 107 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L106-L107

Added lines #L106 - L107 were not covered by tests
Expand Down
75 changes: 39 additions & 36 deletions internal/admin/admin_routes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package admin

import (
"errors"
"fmt"
"log"
"net/http"
Expand Down Expand Up @@ -28,50 +29,51 @@ func configureRoutes(
logger *zap.Logger,
cs changestate.ChangeState,
conf *config.DGateConfig,
) {
) error {
adminConfig := conf.AdminConfig
server.Use(func(next http.Handler) http.Handler {
ipList := iplist.NewIPList()
for _, address := range adminConfig.AllowList {
if strings.Contains(address, "/") {
if err := ipList.AddCIDRString(address); err != nil {
panic(fmt.Sprintf("invalid cidr address in admin.allow_list: %s", address))
}
} else {
if err := ipList.AddIPString(address); err != nil {
panic(fmt.Sprintf("invalid ip address in admin.allow_list: %s", address))
}
ipList := iplist.NewIPList()
for _, address := range adminConfig.AllowList {
if strings.Contains(address, "/") {
if err := ipList.AddCIDRString(address); err != nil {
return fmt.Errorf("invalid cidr address in admin.allow_list: %s", address)

Check warning on line 38 in internal/admin/admin_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_routes.go#L36-L38

Added lines #L36 - L38 were not covered by tests
}
} else {
if err := ipList.AddIPString(address); err != nil {
return fmt.Errorf("invalid ip address in admin.allow_list: %s", address)

Check warning on line 42 in internal/admin/admin_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_routes.go#L40-L42

Added lines #L40 - L42 were not covered by tests
}
}
// basic auth
var userMap map[string]string
// key auth
var keyMap map[string]struct{}
}
// basic auth
var userMap map[string]string
// key auth
var keyMap map[string]struct{}

switch adminConfig.AuthMethod {
case config.AuthMethodBasicAuth:
userMap = make(map[string]string)
if len(adminConfig.BasicAuth.Users) > 0 {
for i, user := range adminConfig.BasicAuth.Users {
if user.Username == "" || user.Password == "" {
panic(fmt.Sprintf("both username and password are required: admin.basic_auth.users[%d]", i))
}
userMap[user.Username] = user.Password
switch adminConfig.AuthMethod {
case config.AuthMethodBasicAuth:
userMap = make(map[string]string)
if len(adminConfig.BasicAuth.Users) > 0 {
for i, user := range adminConfig.BasicAuth.Users {
if user.Username == "" || user.Password == "" {
return errors.New(fmt.Sprintf("both username and password are required: admin.basic_auth.users[%d]", i))

Check warning on line 57 in internal/admin/admin_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_routes.go#L52-L57

Added lines #L52 - L57 were not covered by tests
}
userMap[user.Username] = user.Password

Check warning on line 59 in internal/admin/admin_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_routes.go#L59

Added line #L59 was not covered by tests
}
case config.AuthMethodKeyAuth:
keyMap = make(map[string]struct{})
if adminConfig.KeyAuth != nil && len(adminConfig.KeyAuth.Keys) > 0 {
if adminConfig.KeyAuth.QueryParamName != "" && adminConfig.KeyAuth.HeaderName != "" {
panic("only one of admin.key_auth.query_param_name or admin.key_auth.header_name can be set")
}
for _, key := range adminConfig.KeyAuth.Keys {
keyMap[key] = struct{}{}
}
}
case config.AuthMethodKeyAuth:
keyMap = make(map[string]struct{})
if adminConfig.KeyAuth != nil && len(adminConfig.KeyAuth.Keys) > 0 {
if adminConfig.KeyAuth.QueryParamName != "" && adminConfig.KeyAuth.HeaderName != "" {
return errors.New("only one of admin.key_auth.query_param_name or admin.key_auth.header_name can be set")

Check warning on line 66 in internal/admin/admin_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_routes.go#L62-L66

Added lines #L62 - L66 were not covered by tests
}
for _, key := range adminConfig.KeyAuth.Keys {
keyMap[key] = struct{}{}

Check warning on line 69 in internal/admin/admin_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_routes.go#L68-L69

Added lines #L68 - L69 were not covered by tests
}
case config.AuthMethodJWTAuth:
panic("JWT Auth is not supported yet")
}
case config.AuthMethodJWTAuth:
return errors.New("JWT Auth is not supported yet")

Check warning on line 73 in internal/admin/admin_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_routes.go#L72-L73

Added lines #L72 - L73 were not covered by tests
}

server.Use(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if ipList.Len() > 0 {
remoteIp := util.GetTrustedIP(r,
Expand Down Expand Up @@ -178,6 +180,7 @@ func configureRoutes(
misc.Handle("/metrics", promhttp.Handler())
}
})
return nil
}

func setupMetricProvider(
Expand Down
12 changes: 12 additions & 0 deletions internal/config/configtest/dgate_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,15 @@ func NewTestDGateConfig_DomainAndNamespaces2() *config.DGateConfig {
conf.DisableDefaultNamespace = false
return conf
}

func NewTestAdminConfig() *config.DGateConfig {
conf := NewTestDGateConfig()
conf.AdminConfig = &config.DGateAdminConfig{
Host: "localhost",
Port: 0,
TLS: &config.DGateTLSConfig{
Port: 0,
},
}
return conf
}
39 changes: 17 additions & 22 deletions internal/proxy/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,24 @@ func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) (
return

Check warning on line 37 in internal/proxy/change_log.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/change_log.go#L36-L37

Added lines #L36 - L37 were not covered by tests
}
}
// in memory storage for state restarts
if len(ps.changeLogs) > 0 {
xcl := ps.changeLogs[len(ps.changeLogs)-1]
if xcl.ID == cl.ID {
if r := ps.Raft(); r != nil && r.State() == raft.Leader {
return

Check warning on line 44 in internal/proxy/change_log.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/change_log.go#L43-L44

Added lines #L43 - L44 were not covered by tests
}
ps.logger.Error("duplicate change log",
zap.String("id", cl.ID),
zap.Stringer("cmd", cl.Cmd),
)
return

Check warning on line 50 in internal/proxy/change_log.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/change_log.go#L46-L50

Added lines #L46 - L50 were not covered by tests
}
}
ps.changeLogs = append(ps.changeLogs, cl)
}
}()
}

if len(ps.changeLogs) > 0 {
xcl := ps.changeLogs[len(ps.changeLogs)-1]
if xcl.ID == cl.ID {
if r := ps.Raft(); r != nil && r.State() == raft.Leader {
// FYI: we still need to store the change log
return nil
}
ps.logger.Error("duplicate change log",
zap.String("id", cl.ID),
zap.Stringer("cmd", cl.Cmd),
)
return errors.New("duplicate change log")
}
}

// apply change log to the state
if !cl.Cmd.IsNoop() {
defer func() {
Expand All @@ -77,9 +74,7 @@ func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) (
})
}
}()
if cl.Cmd.Resource() == spec.Documents && !store {
return nil
} else if err = ps.processResource(cl); err != nil {
if err = ps.processResource(cl); err != nil {
ps.logger.Error("decoding or processing change log", zap.Error(err))
return
}
Expand Down Expand Up @@ -334,11 +329,11 @@ func (ps *ProxyState) restoreFromChangeLogs(directApply bool) error {

func (ps *ProxyState) compactChangeLogs(logs []*spec.ChangeLog) (int, error) {
removeList := compactChangeLogsRemoveList(ps.logger, sliceutil.SliceCopy(logs))
removed, err := ps.store.DeleteChangeLogs(removeList)
err := ps.store.DeleteChangeLogs(removeList)

Check warning on line 332 in internal/proxy/change_log.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/change_log.go#L332

Added line #L332 was not covered by tests
if err != nil {
return removed, err
return 0, err

Check warning on line 334 in internal/proxy/change_log.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/change_log.go#L334

Added line #L334 was not covered by tests
}
return removed, nil
return len(logs), nil

Check warning on line 336 in internal/proxy/change_log.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/change_log.go#L336

Added line #L336 was not covered by tests
}

/*
Expand Down
4 changes: 2 additions & 2 deletions internal/proxy/dynamic_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (ps *ProxyState) startProxyServer() {
cfg := ps.config.ProxyConfig
hostPort := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
ps.logger.Info("Starting proxy server on " + hostPort)
proxyHttpLogger := ps.Logger()
proxyHttpLogger := ps.logger.Named("http")
server := &http.Server{
Addr: hostPort,
Handler: ps,
Expand Down Expand Up @@ -324,7 +324,7 @@ func (ps *ProxyState) Stop() {

ps.logger.Info("Stopping proxy server")
defer os.Exit(0)
defer ps.Logger().Sync()
defer ps.logger.Sync()

Check warning on line 327 in internal/proxy/dynamic_proxy.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/dynamic_proxy.go#L327

Added line #L327 was not covered by tests

ps.proxyLock.Lock()
defer ps.proxyLock.Unlock()

Check warning on line 330 in internal/proxy/dynamic_proxy.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/dynamic_proxy.go#L330

Added line #L330 was not covered by tests
Expand Down
Loading

0 comments on commit 6df96f2

Please sign in to comment.