mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 08:16:39 +00:00
Compare commits
3 Commits
feature/re
...
fix/handle
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5af9bbfec9 | ||
|
|
19873db1a9 | ||
|
|
87d1fc3a2f |
@@ -1,9 +1,10 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Scheduler is an interface which implementations can schedule and cancel jobs
|
// Scheduler is an interface which implementations can schedule and cancel jobs
|
||||||
@@ -55,14 +56,8 @@ func (wm *DefaultScheduler) cancel(ID string) bool {
|
|||||||
cancel, ok := wm.jobs[ID]
|
cancel, ok := wm.jobs[ID]
|
||||||
if ok {
|
if ok {
|
||||||
delete(wm.jobs, ID)
|
delete(wm.jobs, ID)
|
||||||
select {
|
close(cancel)
|
||||||
case cancel <- struct{}{}:
|
log.Debugf("cancelled scheduled job %s", ID)
|
||||||
log.Debugf("cancelled scheduled job %s", ID)
|
|
||||||
default:
|
|
||||||
log.Warnf("couldn't cancel job %s because there was no routine listening on the cancel event", ID)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
@@ -90,25 +85,41 @@ func (wm *DefaultScheduler) Schedule(in time.Duration, ID string, job func() (ne
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ticker := time.NewTicker(in)
|
||||||
|
|
||||||
wm.jobs[ID] = cancel
|
wm.jobs[ID] = cancel
|
||||||
log.Debugf("scheduled a job %s to run in %s. There are %d total jobs scheduled.", ID, in.String(), len(wm.jobs))
|
log.Debugf("scheduled a job %s to run in %s. There are %d total jobs scheduled.", ID, in.String(), len(wm.jobs))
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
for {
|
||||||
case <-time.After(in):
|
select {
|
||||||
log.Debugf("time to do a scheduled job %s", ID)
|
case <-ticker.C:
|
||||||
runIn, reschedule := job()
|
select {
|
||||||
wm.mu.Lock()
|
case <-cancel:
|
||||||
defer wm.mu.Unlock()
|
log.Debugf("scheduled job %s was canceled, stop timer", ID)
|
||||||
delete(wm.jobs, ID)
|
ticker.Stop()
|
||||||
if reschedule {
|
return
|
||||||
go wm.Schedule(runIn, ID, job)
|
default:
|
||||||
|
log.Debugf("time to do a scheduled job %s", ID)
|
||||||
|
}
|
||||||
|
runIn, reschedule := job()
|
||||||
|
if !reschedule {
|
||||||
|
wm.mu.Lock()
|
||||||
|
defer wm.mu.Unlock()
|
||||||
|
delete(wm.jobs, ID)
|
||||||
|
log.Debugf("job %s is not scheduled to run again", ID)
|
||||||
|
ticker.Stop()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// we need this comparison to avoid resetting the ticker with the same duration and missing the current elapsesed time
|
||||||
|
if runIn != in {
|
||||||
|
ticker.Reset(runIn)
|
||||||
|
}
|
||||||
|
case <-cancel:
|
||||||
|
log.Debugf("job %s was canceled, stopping timer", ID)
|
||||||
|
ticker.Stop()
|
||||||
|
return
|
||||||
}
|
}
|
||||||
case <-cancel:
|
|
||||||
log.Debugf("stopped scheduled job %s ", ID)
|
|
||||||
wm.mu.Lock()
|
|
||||||
defer wm.mu.Unlock()
|
|
||||||
delete(wm.jobs, ID)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,11 +2,12 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestScheduler_Performance(t *testing.T) {
|
func TestScheduler_Performance(t *testing.T) {
|
||||||
@@ -36,15 +37,24 @@ func TestScheduler_Cancel(t *testing.T) {
|
|||||||
jobID1 := "test-scheduler-job-1"
|
jobID1 := "test-scheduler-job-1"
|
||||||
jobID2 := "test-scheduler-job-2"
|
jobID2 := "test-scheduler-job-2"
|
||||||
scheduler := NewDefaultScheduler()
|
scheduler := NewDefaultScheduler()
|
||||||
scheduler.Schedule(2*time.Second, jobID1, func() (nextRunIn time.Duration, reschedule bool) {
|
tChan := make(chan struct{})
|
||||||
return 0, false
|
p := []string{jobID1, jobID2}
|
||||||
|
scheduler.Schedule(2*time.Millisecond, jobID1, func() (nextRunIn time.Duration, reschedule bool) {
|
||||||
|
tt := p[0]
|
||||||
|
<-tChan
|
||||||
|
t.Logf("job %s", tt)
|
||||||
|
return 2 * time.Millisecond, true
|
||||||
})
|
})
|
||||||
scheduler.Schedule(2*time.Second, jobID2, func() (nextRunIn time.Duration, reschedule bool) {
|
scheduler.Schedule(2*time.Millisecond, jobID2, func() (nextRunIn time.Duration, reschedule bool) {
|
||||||
return 0, false
|
return 2 * time.Millisecond, true
|
||||||
})
|
})
|
||||||
|
|
||||||
|
time.Sleep(4 * time.Millisecond)
|
||||||
assert.Len(t, scheduler.jobs, 2)
|
assert.Len(t, scheduler.jobs, 2)
|
||||||
scheduler.Cancel([]string{jobID1})
|
scheduler.Cancel([]string{jobID1})
|
||||||
|
close(tChan)
|
||||||
|
p = []string{}
|
||||||
|
time.Sleep(4 * time.Millisecond)
|
||||||
assert.Len(t, scheduler.jobs, 1)
|
assert.Len(t, scheduler.jobs, 1)
|
||||||
assert.NotNil(t, scheduler.jobs[jobID2])
|
assert.NotNil(t, scheduler.jobs[jobID2])
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user