diff --git a/management/server/scheduler.go b/management/server/scheduler.go index a35bdc30c..356348056 100644 --- a/management/server/scheduler.go +++ b/management/server/scheduler.go @@ -1,9 +1,10 @@ package server import ( - log "github.com/sirupsen/logrus" "sync" "time" + + log "github.com/sirupsen/logrus" ) // Scheduler is an interface which implementations can schedule and cancel jobs @@ -55,14 +56,8 @@ func (wm *DefaultScheduler) cancel(ID string) bool { cancel, ok := wm.jobs[ID] if ok { delete(wm.jobs, ID) - select { - case cancel <- struct{}{}: - log.Debugf("cancelled scheduled job %s", ID) - default: - log.Warnf("couldn't cancel job %s because there was no routine listening on the cancel event", ID) - return false - } - + close(cancel) + log.Debugf("cancelled scheduled job %s", ID) } return ok } @@ -90,25 +85,41 @@ func (wm *DefaultScheduler) Schedule(in time.Duration, ID string, job func() (ne return } + ticker := time.NewTicker(in) + wm.jobs[ID] = cancel log.Debugf("scheduled a job %s to run in %s. There are %d total jobs scheduled.", ID, in.String(), len(wm.jobs)) go func() { - select { - case <-time.After(in): - log.Debugf("time to do a scheduled job %s", ID) - runIn, reschedule := job() - wm.mu.Lock() - defer wm.mu.Unlock() - delete(wm.jobs, ID) - if reschedule { - go wm.Schedule(runIn, ID, job) + for { + select { + case <-ticker.C: + select { + case <-cancel: + log.Debugf("scheduled job %s was canceled, stop timer", ID) + ticker.Stop() + return + default: + log.Debugf("time to do a scheduled job %s", ID) + } + runIn, reschedule := job() + if !reschedule { + wm.mu.Lock() + defer wm.mu.Unlock() + delete(wm.jobs, ID) + log.Debugf("job %s is not scheduled to run again", ID) + ticker.Stop() + return + } + // we need this comparison to avoid resetting the ticker with the same duration and missing the current elapsesed time + if runIn != in { + ticker.Reset(runIn) + } + case <-cancel: + log.Debugf("job %s was canceled, stopping timer", ID) + ticker.Stop() + return } - case <-cancel: - log.Debugf("stopped scheduled job %s ", ID) - wm.mu.Lock() - defer wm.mu.Unlock() - delete(wm.jobs, ID) - return } + }() } diff --git a/management/server/scheduler_test.go b/management/server/scheduler_test.go index 0c0cef99b..4b2c2e30d 100644 --- a/management/server/scheduler_test.go +++ b/management/server/scheduler_test.go @@ -2,11 +2,12 @@ package server import ( "fmt" - "github.com/stretchr/testify/assert" "math/rand" "sync" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestScheduler_Performance(t *testing.T) { @@ -36,15 +37,24 @@ func TestScheduler_Cancel(t *testing.T) { jobID1 := "test-scheduler-job-1" jobID2 := "test-scheduler-job-2" scheduler := NewDefaultScheduler() - scheduler.Schedule(2*time.Second, jobID1, func() (nextRunIn time.Duration, reschedule bool) { - return 0, false + tChan := make(chan struct{}) + p := []string{jobID1, jobID2} + scheduler.Schedule(2*time.Millisecond, jobID1, func() (nextRunIn time.Duration, reschedule bool) { + tt := p[0] + <-tChan + t.Logf("job %s", tt) + return 2 * time.Millisecond, true }) - scheduler.Schedule(2*time.Second, jobID2, func() (nextRunIn time.Duration, reschedule bool) { - return 0, false + scheduler.Schedule(2*time.Millisecond, jobID2, func() (nextRunIn time.Duration, reschedule bool) { + return 2 * time.Millisecond, true }) + time.Sleep(4 * time.Millisecond) assert.Len(t, scheduler.jobs, 2) scheduler.Cancel([]string{jobID1}) + close(tChan) + p = []string{} + time.Sleep(4 * time.Millisecond) assert.Len(t, scheduler.jobs, 1) assert.NotNil(t, scheduler.jobs[jobID2]) }