[client] Use ctx.Err() instead of gRPC codes.Canceled to detect shutdown (#6019)

Detecting shutdown by inspecting the gRPC status code conflates a local
context cancellation with a server- or proxy-sent codes.Canceled. When
the latter occurs (e.g. an intermediary proxy resets the stream), the
retry loop silently terminates and the client never reconnects.

Switch to ctx.Err() in the signal Receive loop and management Sync/Job
handlers, and stop matching gRPC Canceled/DeadlineExceeded in the flow
client's isContextDone helper. With this change, a server-sent Canceled
is treated as a transient error and the backoff retry loop continues.
This commit is contained in:
Zoltan Papp
2026-05-04 11:59:25 +02:00
committed by GitHub
parent a21f6ecb0a
commit a547fc74ed
3 changed files with 21 additions and 35 deletions

View File

@@ -246,27 +246,23 @@ func (c *GrpcClient) handleJobStream(
for {
jobReq, err := c.receiveJobRequest(ctx, stream, serverPubKey)
if err != nil {
if ctx.Err() != nil {
log.Debugf("job stream context has been canceled, this usually indicates shutdown")
return nil
}
if s, ok := gstatus.FromError(err); ok {
switch s.Code() {
case codes.PermissionDenied:
c.notifyDisconnected(err)
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
case codes.Canceled:
log.Debugf("job stream context has been canceled, this usually indicates shutdown")
return err
case codes.Unimplemented:
log.Warn("Job feature is not supported by the current management server version. " +
"Please update the management service to use this feature.")
return nil
default:
log.Warnf("job stream disconnected, will retry silently. Reason: %v", err)
return err
}
} else {
// non-gRPC error
log.Warnf("job stream disconnected, will retry silently. Reason: %v", err)
return err
}
log.Warnf("job stream disconnected, will retry silently. Reason: %v", err)
return err
}
if jobReq == nil || len(jobReq.ID) == 0 {
@@ -381,22 +377,15 @@ func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes.
err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler)
if err != nil {
c.notifyDisconnected(err)
if s, ok := gstatus.FromError(err); ok {
switch s.Code() {
case codes.PermissionDenied:
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
case codes.Canceled:
log.Debugf("management connection context has been canceled, this usually indicates shutdown")
return nil
default:
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
return err
}
} else {
// non-gRPC error
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
return err
if ctx.Err() != nil {
log.Debugf("management connection context has been canceled, this usually indicates shutdown")
return nil
}
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
}
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
return err
}
return nil