sync limit fix

This commit is contained in:
crn4
2025-10-17 17:11:15 +02:00
parent 9914212ce5
commit b8d9386466

View File

@@ -175,7 +175,6 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
return status.Errorf(codes.ResourceExhausted, "too many concurrent sync requests, please try again later") return status.Errorf(codes.ResourceExhausted, "too many concurrent sync requests, please try again later")
} }
s.syncSem.Add(1) s.syncSem.Add(1)
defer s.syncSem.Add(-1)
reqStart := time.Now() reqStart := time.Now()
@@ -184,6 +183,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
syncReq := &proto.SyncRequest{} syncReq := &proto.SyncRequest{}
peerKey, err := s.parseRequest(ctx, req, syncReq) peerKey, err := s.parseRequest(ctx, req, syncReq)
if err != nil { if err != nil {
s.syncSem.Add(-1)
return err return err
} }
realIP := getRealIP(ctx) realIP := getRealIP(ctx)
@@ -198,6 +198,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
log.WithContext(ctx).Warnf("peer %s with meta hash %d is blocked from syncing", peerKey.String(), metahashed) log.WithContext(ctx).Warnf("peer %s with meta hash %d is blocked from syncing", peerKey.String(), metahashed)
} }
if s.blockPeersWithSameConfig { if s.blockPeersWithSameConfig {
s.syncSem.Add(-1)
return mapError(ctx, internalStatus.ErrPeerAlreadyLoggedIn) return mapError(ctx, internalStatus.ErrPeerAlreadyLoggedIn)
} }
} }
@@ -222,8 +223,10 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
ctx = context.WithValue(ctx, nbContext.AccountIDKey, "UNKNOWN") ctx = context.WithValue(ctx, nbContext.AccountIDKey, "UNKNOWN")
log.WithContext(ctx).Tracef("peer %s is not registered", peerKey.String()) log.WithContext(ctx).Tracef("peer %s is not registered", peerKey.String())
if errStatus, ok := internalStatus.FromError(err); ok && errStatus.Type() == internalStatus.NotFound { if errStatus, ok := internalStatus.FromError(err); ok && errStatus.Type() == internalStatus.NotFound {
s.syncSem.Add(-1)
return status.Errorf(codes.PermissionDenied, "peer is not registered") return status.Errorf(codes.PermissionDenied, "peer is not registered")
} }
s.syncSem.Add(-1)
return err return err
} }
@@ -239,12 +242,14 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), peerMeta, realIP) peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), peerMeta, realIP)
if err != nil { if err != nil {
log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err) log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err)
s.syncSem.Add(-1)
return mapError(ctx, err) return mapError(ctx, err)
} }
err = s.sendInitialSync(ctx, peerKey, peer, netMap, postureChecks, srv) err = s.sendInitialSync(ctx, peerKey, peer, netMap, postureChecks, srv)
if err != nil { if err != nil {
log.WithContext(ctx).Debugf("error while sending initial sync for %s: %v", peerKey.String(), err) log.WithContext(ctx).Debugf("error while sending initial sync for %s: %v", peerKey.String(), err)
s.syncSem.Add(-1)
return err return err
} }
@@ -263,6 +268,8 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart)) log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
s.syncSem.Add(-1)
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv) return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv)
} }