[management] scheduler cancel all jobs (#4158)

This commit is contained in:
Pedro Maia Costa
2025-07-24 16:25:21 +01:00
committed by GitHub
parent 0ea5d020a3
commit 1a9ea32c21
4 changed files with 61 additions and 6 deletions

2
go.mod
View File

@@ -63,7 +63,7 @@ require (
github.com/miekg/dns v1.1.59 github.com/miekg/dns v1.1.59
github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/nadoo/ipset v0.5.0 github.com/nadoo/ipset v0.5.0
github.com/netbirdio/management-integrations/integrations v0.0.0-20250718161635-83fb99b09b5a github.com/netbirdio/management-integrations/integrations v0.0.0-20250724151510-c007bc6b392c
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250514131221-a464fd5f30cb github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250514131221-a464fd5f30cb
github.com/okta/okta-sdk-golang/v2 v2.18.0 github.com/okta/okta-sdk-golang/v2 v2.18.0
github.com/oschwald/maxminddb-golang v1.12.0 github.com/oschwald/maxminddb-golang v1.12.0

4
go.sum
View File

@@ -503,8 +503,8 @@ github.com/netbirdio/go-netroute v0.0.0-20240611143515-f59b0e1d3944 h1:TDtJKmM6S
github.com/netbirdio/go-netroute v0.0.0-20240611143515-f59b0e1d3944/go.mod h1:sHA6TRxjQ6RLbnI+3R4DZo2Eseg/iKiPRfNmcuNySVQ= github.com/netbirdio/go-netroute v0.0.0-20240611143515-f59b0e1d3944/go.mod h1:sHA6TRxjQ6RLbnI+3R4DZo2Eseg/iKiPRfNmcuNySVQ=
github.com/netbirdio/ice/v3 v3.0.0-20240315174635-e72a50fcb64e h1:PURA50S8u4mF6RrkYYCAvvPCixhqqEiEy3Ej6avh04c= github.com/netbirdio/ice/v3 v3.0.0-20240315174635-e72a50fcb64e h1:PURA50S8u4mF6RrkYYCAvvPCixhqqEiEy3Ej6avh04c=
github.com/netbirdio/ice/v3 v3.0.0-20240315174635-e72a50fcb64e/go.mod h1:YMLU7qbKfVjmEv7EoZPIVEI+kNYxWCdPK3VS0BU+U4Q= github.com/netbirdio/ice/v3 v3.0.0-20240315174635-e72a50fcb64e/go.mod h1:YMLU7qbKfVjmEv7EoZPIVEI+kNYxWCdPK3VS0BU+U4Q=
github.com/netbirdio/management-integrations/integrations v0.0.0-20250718161635-83fb99b09b5a h1:Kmq74+axAiJrD98+uAr53sIuj/zwMrak05Ofoy4SWYU= github.com/netbirdio/management-integrations/integrations v0.0.0-20250724151510-c007bc6b392c h1:OtX903X0FKEE+fcsp/P2701md7X/xbi/W/ojWIJNKSk=
github.com/netbirdio/management-integrations/integrations v0.0.0-20250718161635-83fb99b09b5a/go.mod h1:Gi9raplYzCCyh07Olw/DVfCJTFgpr1WCXJ/Q+8TSA9Q= github.com/netbirdio/management-integrations/integrations v0.0.0-20250724151510-c007bc6b392c/go.mod h1:Gi9raplYzCCyh07Olw/DVfCJTFgpr1WCXJ/Q+8TSA9Q=
github.com/netbirdio/service v0.0.0-20240911161631-f62744f42502 h1:3tHlFmhTdX9axERMVN63dqyFqnvuD+EMJHzM7mNGON8= github.com/netbirdio/service v0.0.0-20240911161631-f62744f42502 h1:3tHlFmhTdX9axERMVN63dqyFqnvuD+EMJHzM7mNGON8=
github.com/netbirdio/service v0.0.0-20240911161631-f62744f42502/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM= github.com/netbirdio/service v0.0.0-20240911161631-f62744f42502/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM=
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250514131221-a464fd5f30cb h1:Cr6age+ePALqlSvtp7wc6lYY97XN7rkD1K4XEDmY+TU= github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250514131221-a464fd5f30cb h1:Cr6age+ePALqlSvtp7wc6lYY97XN7rkD1K4XEDmY+TU=

View File

@@ -11,14 +11,17 @@ import (
// Scheduler is an interface which implementations can schedule and cancel jobs // Scheduler is an interface which implementations can schedule and cancel jobs
type Scheduler interface { type Scheduler interface {
Cancel(ctx context.Context, IDs []string) Cancel(ctx context.Context, IDs []string)
CancelAll(ctx context.Context)
Schedule(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) Schedule(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool))
IsSchedulerRunning(ID string) bool IsSchedulerRunning(ID string) bool
} }
// MockScheduler is a mock implementation of Scheduler // MockScheduler is a mock implementation of Scheduler
type MockScheduler struct { type MockScheduler struct {
CancelFunc func(ctx context.Context, IDs []string) CancelFunc func(ctx context.Context, IDs []string)
ScheduleFunc func(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) CancelAllFunc func(ctx context.Context)
ScheduleFunc func(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool))
IsSchedulerRunningFunc func(ID string) bool
} }
// Cancel mocks the Cancel function of the Scheduler interface // Cancel mocks the Cancel function of the Scheduler interface
@@ -30,6 +33,15 @@ func (mock *MockScheduler) Cancel(ctx context.Context, IDs []string) {
log.WithContext(ctx).Warnf("MockScheduler doesn't have Cancel function defined ") log.WithContext(ctx).Warnf("MockScheduler doesn't have Cancel function defined ")
} }
// CancelAll mocks the CancelAll function of the Scheduler interface
func (mock *MockScheduler) CancelAll(ctx context.Context) {
if mock.CancelAllFunc != nil {
mock.CancelAllFunc(ctx)
return
}
log.WithContext(ctx).Warnf("MockScheduler doesn't have CancelAll function defined ")
}
// Schedule mocks the Schedule function of the Scheduler interface // Schedule mocks the Schedule function of the Scheduler interface
func (mock *MockScheduler) Schedule(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) { func (mock *MockScheduler) Schedule(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) {
if mock.ScheduleFunc != nil { if mock.ScheduleFunc != nil {
@@ -40,7 +52,9 @@ func (mock *MockScheduler) Schedule(ctx context.Context, in time.Duration, ID st
} }
func (mock *MockScheduler) IsSchedulerRunning(ID string) bool { func (mock *MockScheduler) IsSchedulerRunning(ID string) bool {
// MockScheduler does not implement IsSchedulerRunning, so we return false if mock.IsSchedulerRunningFunc != nil {
return mock.IsSchedulerRunningFunc(ID)
}
log.Warnf("MockScheduler doesn't have IsSchedulerRunning function defined") log.Warnf("MockScheduler doesn't have IsSchedulerRunning function defined")
return false return false
} }
@@ -52,6 +66,15 @@ type DefaultScheduler struct {
mu *sync.Mutex mu *sync.Mutex
} }
func (wm *DefaultScheduler) CancelAll(ctx context.Context) {
wm.mu.Lock()
defer wm.mu.Unlock()
for id := range wm.jobs {
wm.cancel(ctx, id)
}
}
// NewDefaultScheduler creates an instance of a DefaultScheduler // NewDefaultScheduler creates an instance of a DefaultScheduler
func NewDefaultScheduler() *DefaultScheduler { func NewDefaultScheduler() *DefaultScheduler {
return &DefaultScheduler{ return &DefaultScheduler{

View File

@@ -75,6 +75,38 @@ func TestScheduler_Cancel(t *testing.T) {
assert.NotNil(t, scheduler.jobs[jobID2]) assert.NotNil(t, scheduler.jobs[jobID2])
} }
func TestScheduler_CancelAll(t *testing.T) {
jobID1 := "test-scheduler-job-1"
jobID2 := "test-scheduler-job-2"
scheduler := NewDefaultScheduler()
tChan := make(chan struct{})
p := []string{jobID1, jobID2}
scheduletime := 2 * time.Millisecond
sleepTime := 4 * time.Millisecond
if runtime.GOOS == "windows" {
// sleep and ticker are slower on windows see https://github.com/golang/go/issues/44343
sleepTime = 20 * time.Millisecond
}
scheduler.Schedule(context.Background(), scheduletime, jobID1, func() (nextRunIn time.Duration, reschedule bool) {
tt := p[0]
<-tChan
t.Logf("job %s", tt)
return scheduletime, true
})
scheduler.Schedule(context.Background(), scheduletime, jobID2, func() (nextRunIn time.Duration, reschedule bool) {
return scheduletime, true
})
time.Sleep(sleepTime)
assert.Len(t, scheduler.jobs, 2)
scheduler.CancelAll(context.Background())
close(tChan)
p = []string{}
time.Sleep(sleepTime)
assert.Len(t, scheduler.jobs, 0)
}
func TestScheduler_Schedule(t *testing.T) { func TestScheduler_Schedule(t *testing.T) {
jobID := "test-scheduler-job-1" jobID := "test-scheduler-job-1"
scheduler := NewDefaultScheduler() scheduler := NewDefaultScheduler()