apply feedbacks 1

This commit is contained in:
aliamerj
2025-08-22 20:57:50 +03:00
parent b7f0088fe3
commit cc1338f92d
10 changed files with 353 additions and 120 deletions

View File

@@ -127,24 +127,20 @@ func GetKeyQueryCondition(s *SqlStore) string {
}
// SaveJob persists a job in DB
func (s *SqlStore) SaveJob(ctx context.Context, job *types.Job) error {
start := time.Now()
defer func() {
if s.metrics != nil {
s.metrics.StoreMetrics().CountPersistenceDuration(time.Since(start))
}
}()
return s.db.WithContext(ctx).
Clauses(clause.OnConflict{UpdateAll: true}).
Create(job).Error
func (s *SqlStore) CreatePeerJob(ctx context.Context, job *types.Job) error {
result := s.db.Create(job)
if result.Error != nil {
log.WithContext(ctx).Errorf("failed to create job in store: %s", result.Error)
return status.Errorf(status.Internal, "failed to create job in store")
}
return nil
}
// job was pending for too long and has been cancelled
// todo call it when we first start the jobChannel to make sure no stuck jobs
func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, peerID string) error {
now := time.Now().UTC()
return s.db.WithContext(ctx).
return s.db.
Model(&types.Job{}).
Where("peer_id = ? AND status = ?", types.JobStatusPending, peerID).
Updates(map[string]any{
@@ -155,9 +151,9 @@ func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, peerID string) e
}
// GetJobByID fetches job by ID
func (s *SqlStore) GetJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) {
func (s *SqlStore) GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) {
var job types.Job
err := s.db.WithContext(ctx).
err := s.db.
Where(accountAndIDQueryCondition, accountID, jobID).
First(&job).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
@@ -167,12 +163,13 @@ func (s *SqlStore) GetJobByID(ctx context.Context, accountID, jobID string) (*ty
}
// get all jobs
func (s *SqlStore) GetJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) {
func (s *SqlStore) GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) {
var jobs []*types.Job
err := s.db.WithContext(ctx).
err := s.db.
Where(accountAndPeerIDQueryCondition, accountID, peerID).
Order("created_at DESC").
Find(&jobs).Error
if err != nil {
return nil, err
}
@@ -180,19 +177,26 @@ func (s *SqlStore) GetJobs(ctx context.Context, accountID, peerID string) ([]*ty
return jobs, nil
}
func (s *SqlStore) CompleteJob(ctx context.Context, accountID, jobID, result string, failedReason string) error {
job, err := s.GetJobByID(ctx, accountID, jobID)
if err != nil {
return err
}
// mark it as succeeded or failed
if result != "" && failedReason == "" {
job.MarkSucceeded(result)
} else {
job.MarkFailed(failedReason)
func (s *SqlStore) CompletePeerJob(accountID, jobID, result, failedReason string) error {
now := time.Now().UTC()
updates := map[string]any{
"completed_at": now,
}
return s.db.WithContext(ctx).Save(job).Error
if result != "" && failedReason == "" {
updates["status"] = types.JobStatusSucceeded
updates["result"] = result
updates["failed_reason"] = ""
} else {
updates["status"] = types.JobStatusFailed
updates["failed_reason"] = failedReason
}
return s.db.
Model(&types.Job{}).
Where(accountAndIDQueryCondition, accountID, jobID).
Updates(updates).Error
}
// AcquireGlobalLock acquires global lock across all the accounts and returns a function that releases the lock