check if job is running

This commit is contained in:
Pascal Fischer
2025-01-16 21:30:56 +01:00
parent f597379be5
commit b6a558b523
3 changed files with 37 additions and 4 deletions

View File

@@ -485,6 +485,9 @@ func (am *DefaultAccountManager) peerLoginExpirationJob(ctx context.Context, acc
}
func (am *DefaultAccountManager) checkAndSchedulePeerLoginExpiration(ctx context.Context, accountID string) {
if am.peerLoginExpiry.IsJobRunning(accountID) {
return
}
am.peerLoginExpiry.Cancel(ctx, []string{accountID})
if nextRun, ok := am.getNextPeerExpiration(ctx, accountID); ok {
go am.peerLoginExpiry.Schedule(ctx, nextRun, accountID, am.peerLoginExpirationJob(ctx, accountID))

View File

@@ -122,6 +122,7 @@ func (am *DefaultAccountManager) GetPeers(ctx context.Context, accountID, userID
func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, connected bool, realIP net.IP, accountID string) error {
var peer *nbpeer.Peer
var expired bool
var settings *types.Settings
var err error
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
@@ -130,6 +131,11 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
return err
}
settings, err = transaction.GetAccountSettings(ctx, store.LockingStrengthShare, accountID)
if err != nil {
return err
}
expired, err = updatePeerStatusAndLocation(ctx, am.geo, transaction, peer, connected, realIP, accountID)
return err
})
@@ -143,6 +149,16 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
am.UpdateAccountPeers(ctx, accountID)
}
if peer.AddedWithSSOLogin() {
if peer.LoginExpirationEnabled && settings.PeerLoginExpirationEnabled {
am.checkAndSchedulePeerLoginExpiration(ctx, accountID)
}
if peer.InactivityExpirationEnabled && settings.PeerInactivityExpirationEnabled {
am.checkAndSchedulePeerInactivityExpiration(ctx, accountID)
}
}
return nil
}

View File

@@ -12,6 +12,7 @@ import (
type Scheduler interface {
Cancel(ctx context.Context, IDs []string)
Schedule(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool))
IsJobRunning(ID string) bool
}
// MockScheduler is a mock implementation of Scheduler
@@ -38,18 +39,24 @@ func (mock *MockScheduler) Schedule(ctx context.Context, in time.Duration, ID st
log.WithContext(ctx).Errorf("MockScheduler doesn't have Schedule function defined")
}
func (mock *MockScheduler) IsJobRunning(_ string) bool {
return true
}
// DefaultScheduler is a generic structure that allows to schedule jobs (functions) to run in the future and cancel them.
type DefaultScheduler struct {
// jobs map holds cancellation channels indexed by the job ID
jobs map[string]chan struct{}
mu *sync.Mutex
jobs map[string]chan struct{}
mu *sync.Mutex
isJobRunning map[string]bool
}
// NewDefaultScheduler creates an instance of a DefaultScheduler
func NewDefaultScheduler() *DefaultScheduler {
return &DefaultScheduler{
jobs: make(map[string]chan struct{}),
mu: &sync.Mutex{},
jobs: make(map[string]chan struct{}),
isJobRunning: make(map[string]bool),
mu: &sync.Mutex{},
}
}
@@ -87,6 +94,7 @@ func (wm *DefaultScheduler) Schedule(ctx context.Context, in time.Duration, ID s
}
ticker := time.NewTicker(in)
wm.isJobRunning[ID] = true
wm.jobs[ID] = cancel
log.WithContext(ctx).Debugf("scheduled a job %s to run in %s. There are %d total jobs scheduled.", ID, in.String(), len(wm.jobs))
@@ -98,6 +106,7 @@ func (wm *DefaultScheduler) Schedule(ctx context.Context, in time.Duration, ID s
case <-cancel:
log.WithContext(ctx).Debugf("scheduled job %s was canceled, stop timer", ID)
ticker.Stop()
wm.isJobRunning[ID] = false
return
default:
log.WithContext(ctx).Debugf("time to do a scheduled job %s", ID)
@@ -109,6 +118,7 @@ func (wm *DefaultScheduler) Schedule(ctx context.Context, in time.Duration, ID s
delete(wm.jobs, ID)
log.WithContext(ctx).Debugf("job %s is not scheduled to run again", ID)
ticker.Stop()
wm.isJobRunning[ID] = false
return
}
// we need this comparison to avoid resetting the ticker with the same duration and missing the current elapsesed time
@@ -124,3 +134,7 @@ func (wm *DefaultScheduler) Schedule(ctx context.Context, in time.Duration, ID s
}()
}
func (wm *DefaultScheduler) IsJobRunning(ID string) bool {
return wm.isJobRunning[ID]
}