diff --git a/management/server/scheduler.go b/management/server/scheduler.go index a35bdc30c..ba3d03fc0 100644 --- a/management/server/scheduler.go +++ b/management/server/scheduler.go @@ -1,9 +1,10 @@ package server import ( - log "github.com/sirupsen/logrus" "sync" "time" + + log "github.com/sirupsen/logrus" ) // Scheduler is an interface which implementations can schedule and cancel jobs @@ -55,14 +56,8 @@ func (wm *DefaultScheduler) cancel(ID string) bool { cancel, ok := wm.jobs[ID] if ok { delete(wm.jobs, ID) - select { - case cancel <- struct{}{}: - log.Debugf("cancelled scheduled job %s", ID) - default: - log.Warnf("couldn't cancel job %s because there was no routine listening on the cancel event", ID) - return false - } - + close(cancel) + log.Debugf("cancelled scheduled job %s", ID) } return ok } @@ -90,25 +85,36 @@ func (wm *DefaultScheduler) Schedule(in time.Duration, ID string, job func() (ne return } + ticker := time.NewTicker(in) + wm.jobs[ID] = cancel log.Debugf("scheduled a job %s to run in %s. There are %d total jobs scheduled.", ID, in.String(), len(wm.jobs)) go func() { - select { - case <-time.After(in): - log.Debugf("time to do a scheduled job %s", ID) - runIn, reschedule := job() - wm.mu.Lock() - defer wm.mu.Unlock() - delete(wm.jobs, ID) - if reschedule { - go wm.Schedule(runIn, ID, job) + for { + select { + case <-ticker.C: + log.Debugf("time to do a scheduled job %s", ID) + runIn, reschedule := job() + if !reschedule { + wm.mu.Lock() + defer wm.mu.Unlock() + delete(wm.jobs, ID) + log.Debugf("job %s is not scheduled to run again", ID) + ticker.Stop() + return + } + if runIn != in { // do we need to compare it with the original duration? + ticker.Reset(runIn) + } + case <-cancel: + ticker.Stop() + log.Debugf("stopped scheduled job %s ", ID) + wm.mu.Lock() + defer wm.mu.Unlock() + delete(wm.jobs, ID) + return } - case <-cancel: - log.Debugf("stopped scheduled job %s ", ID) - wm.mu.Lock() - defer wm.mu.Unlock() - delete(wm.jobs, ID) - return } + }() }