Files
netbird/management/server/scheduler.go
Maycon Santos 87d1fc3a2f Handle canceling schedule and avoid recursive call
Using time.Ticker allow us to avoid recursive call that may end up in schedule running and possible deadlock if no routine is listening for cancel calls

switch to closing channel
2024-02-27 18:38:04 +01:00

121 lines
3.3 KiB
Go

package server
import (
"sync"
"time"
log "github.com/sirupsen/logrus"
)
// Scheduler is an interface which implementations can schedule and cancel jobs
type Scheduler interface {
Cancel(IDs []string)
Schedule(in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool))
}
// MockScheduler is a mock implementation of Scheduler
type MockScheduler struct {
CancelFunc func(IDs []string)
ScheduleFunc func(in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool))
}
// Cancel mocks the Cancel function of the Scheduler interface
func (mock *MockScheduler) Cancel(IDs []string) {
if mock.CancelFunc != nil {
mock.CancelFunc(IDs)
return
}
log.Errorf("MockScheduler doesn't have Cancel function defined ")
}
// Schedule mocks the Schedule function of the Scheduler interface
func (mock *MockScheduler) Schedule(in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) {
if mock.ScheduleFunc != nil {
mock.ScheduleFunc(in, ID, job)
return
}
log.Errorf("MockScheduler doesn't have Schedule function defined")
}
// DefaultScheduler is a generic structure that allows to schedule jobs (functions) to run in the future and cancel them.
type DefaultScheduler struct {
// jobs map holds cancellation channels indexed by the job ID
jobs map[string]chan struct{}
mu *sync.Mutex
}
// NewDefaultScheduler creates an instance of a DefaultScheduler
func NewDefaultScheduler() *DefaultScheduler {
return &DefaultScheduler{
jobs: make(map[string]chan struct{}),
mu: &sync.Mutex{},
}
}
func (wm *DefaultScheduler) cancel(ID string) bool {
cancel, ok := wm.jobs[ID]
if ok {
delete(wm.jobs, ID)
close(cancel)
log.Debugf("cancelled scheduled job %s", ID)
}
return ok
}
// Cancel cancels the scheduled job by ID if present.
// If job wasn't found the function returns false.
func (wm *DefaultScheduler) Cancel(IDs []string) {
wm.mu.Lock()
defer wm.mu.Unlock()
for _, id := range IDs {
wm.cancel(id)
}
}
// Schedule a job to run in some time in the future. If job returns true then it will be scheduled one more time.
// If job with the provided ID already exists, a new one won't be scheduled.
func (wm *DefaultScheduler) Schedule(in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) {
wm.mu.Lock()
defer wm.mu.Unlock()
cancel := make(chan struct{})
if _, ok := wm.jobs[ID]; ok {
log.Debugf("couldn't schedule a job %s because it already exists. There are %d total jobs scheduled.",
ID, len(wm.jobs))
return
}
ticker := time.NewTicker(in)
wm.jobs[ID] = cancel
log.Debugf("scheduled a job %s to run in %s. There are %d total jobs scheduled.", ID, in.String(), len(wm.jobs))
go func() {
for {
select {
case <-ticker.C:
log.Debugf("time to do a scheduled job %s", ID)
runIn, reschedule := job()
if !reschedule {
wm.mu.Lock()
defer wm.mu.Unlock()
delete(wm.jobs, ID)
log.Debugf("job %s is not scheduled to run again", ID)
ticker.Stop()
return
}
if runIn != in { // do we need to compare it with the original duration?
ticker.Reset(runIn)
}
case <-cancel:
ticker.Stop()
log.Debugf("stopped scheduled job %s ", ID)
wm.mu.Lock()
defer wm.mu.Unlock()
delete(wm.jobs, ID)
return
}
}
}()
}