Skip to content

Commit

Permalink
manager: fix checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
skyzh committed Jan 4, 2021
1 parent 50c6f68 commit 39bbc46
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type Status struct {
}

type WorkerCheckPoint struct {
LastInvokeTime time.Time
LastInvokeTime time.Time `json:"last_invoke_time"`
}

type CheckPoint struct {
workerInfo map[string]WorkerCheckPoint
WorkerInfo map[string]WorkerCheckPoint `json:"worker_info"`
}

// fromCheckpoint laods last invoke time from json
Expand Down Expand Up @@ -89,7 +89,7 @@ func NewManager(config *config.Config) (*Manager, error) {
if err != nil {
logger.Info("failed to parse checkpoint file")
} else {
for worker, info := range checkpoint.workerInfo {
for worker, info := range checkpoint.WorkerInfo {
workersLastInvokeTime[worker] = info.LastInvokeTime
}
}
Expand Down Expand Up @@ -120,9 +120,18 @@ func NewManager(config *config.Config) (*Manager, error) {
}

func (m *Manager) checkpoint() error {
file, _ := json.MarshalIndent(m.workersLastInvokeTime, "", " ")
ckptObj := &CheckPoint{WorkerInfo: make(map[string]WorkerCheckPoint)}
for k, t := range m.workersLastInvokeTime {
ckptObj.WorkerInfo[k] = WorkerCheckPoint{
LastInvokeTime: t,
}
}
file, err := json.MarshalIndent(ckptObj, "", " ")
if err != nil {
return err
}
ckpt := fmt.Sprintf("%s.tmp", m.config.Checkpoint)
err := ioutil.WriteFile(ckpt, file, 0644)
err = ioutil.WriteFile(ckpt, file, 0644)
if err != nil {
return err
}
Expand Down Expand Up @@ -184,6 +193,7 @@ func (m *Manager) Run() {
}).Debugf("Calling RunSync() to w %s", w.GetConfig()["name"])
go w.RunSync()
}
m.checkpoint()
for {
// wait until config.Interval seconds has elapsed
select {
Expand Down

0 comments on commit 39bbc46

Please sign in to comment.