From 87d1fc3a2fb2409a287b3d05c6f12193bc6aa520 Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Tue, 27 Feb 2024 18:38:04 +0100 Subject: [PATCH] Handle canceling schedule and avoid recursive call Using time.Ticker allow us to avoid recursive call that may end up in schedule running and possible deadlock if no routine is listening for cancel calls switch to closing channel --- management/server/scheduler.go | 54 +++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/management/server/scheduler.go b/management/server/scheduler.go index a35bdc30c..ba3d03fc0 100644 --- a/management/server/scheduler.go +++ b/management/server/scheduler.go @@ -1,9 +1,10 @@ package server import ( - log "github.com/sirupsen/logrus" "sync" "time" + + log "github.com/sirupsen/logrus" ) // Scheduler is an interface which implementations can schedule and cancel jobs @@ -55,14 +56,8 @@ func (wm *DefaultScheduler) cancel(ID string) bool { cancel, ok := wm.jobs[ID] if ok { delete(wm.jobs, ID) - select { - case cancel <- struct{}{}: - log.Debugf("cancelled scheduled job %s", ID) - default: - log.Warnf("couldn't cancel job %s because there was no routine listening on the cancel event", ID) - return false - } - + close(cancel) + log.Debugf("cancelled scheduled job %s", ID) } return ok } @@ -90,25 +85,36 @@ func (wm *DefaultScheduler) Schedule(in time.Duration, ID string, job func() (ne return } + ticker := time.NewTicker(in) + wm.jobs[ID] = cancel log.Debugf("scheduled a job %s to run in %s. There are %d total jobs scheduled.", ID, in.String(), len(wm.jobs)) go func() { - select { - case <-time.After(in): - log.Debugf("time to do a scheduled job %s", ID) - runIn, reschedule := job() - wm.mu.Lock() - defer wm.mu.Unlock() - delete(wm.jobs, ID) - if reschedule { - go wm.Schedule(runIn, ID, job) + for { + select { + case <-ticker.C: + log.Debugf("time to do a scheduled job %s", ID) + runIn, reschedule := job() + if !reschedule { + wm.mu.Lock() + defer wm.mu.Unlock() + delete(wm.jobs, ID) + log.Debugf("job %s is not scheduled to run again", ID) + ticker.Stop() + return + } + if runIn != in { // do we need to compare it with the original duration? + ticker.Reset(runIn) + } + case <-cancel: + ticker.Stop() + log.Debugf("stopped scheduled job %s ", ID) + wm.mu.Lock() + defer wm.mu.Unlock() + delete(wm.jobs, ID) + return } - case <-cancel: - log.Debugf("stopped scheduled job %s ", ID) - wm.mu.Lock() - defer wm.mu.Unlock() - delete(wm.jobs, ID) - return } + }() }