allow ACK messages on signal

This commit is contained in:
Pascal Fischer
2025-09-25 14:39:23 +02:00
parent 9184a0c6ac
commit d9c585f575
2 changed files with 22 additions and 12 deletions

View File

@@ -8,9 +8,9 @@ import (
"fmt"
"net"
"net/http"
// nolint:gosec
_ "net/http/pprof"
"os"
"strings"
"time"
@@ -85,7 +85,9 @@ var (
RunE: func(cmd *cobra.Command, args []string) error {
flag.Parse()
startPprof()
if os.Getenv("NB_PPROF_ENABLE") == "true" {
startPprof()
}
opts, certManager, err := getTLSConfigurations()
if err != nil {
@@ -132,7 +134,7 @@ var (
log.Infof("running gRPC server: %s", grpcListener.Addr().String())
}
if signalPort != 10000 {
if signalPort != 10000 && os.Getenv("NB_DISABLE_FALLBACK_GRPC") != "true" {
// The Signal gRPC server was running on port 10000 previously. Old agents that are already connected to Signal
// are using port 10000. For compatibility purposes we keep running a 2nd gRPC server on port 10000.
compatListener, err = serveGRPC(grpcServer, 10000)

View File

@@ -101,7 +101,7 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.
return &proto.EncryptedMessage{}, nil
}
return s.dispatcher.SendMessage(ctx, msg)
return s.dispatcher.SendMessage(ctx, msg, false)
}
// SendWithDeliveryCheck forwards a message to the signal peer with error handling
@@ -115,11 +115,16 @@ func (s *Server) SendWithDeliveryCheck(ctx context.Context, msg *proto.Encrypted
if _, found := s.registry.Get(msg.RemoteKey); found {
// todo error handling here too
s.forwardMessageToPeer(ctx, msg)
return &emptypb.Empty{}, nil
err := s.forwardMessageToPeer(ctx, msg)
return &emptypb.Empty{}, err
}
return nil, status.Errorf(codes.NotFound, "remote peer not connected")
//return s.dispatcher.SendMessage(ctx, msg)
msg, err := s.dispatcher.SendMessage(ctx, msg, true)
if err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}
// ConnectStream connects to the exchange stream
@@ -177,7 +182,7 @@ func (s *Server) DeregisterPeer(p *peer.Peer) {
s.registry.Deregister(p)
}
func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedMessage) {
func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedMessage) error {
log.Tracef("forwarding a new message from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
getRegistrationStart := time.Now()
@@ -189,7 +194,7 @@ func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedM
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotConnected)))
log.Tracef("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey)
// todo respond to the sender?
return
return fmt.Errorf("destination peer not connected")
}
s.metrics.GetRegistrationDelay.Record(ctx, float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream), attribute.String(labelRegistrationStatus, labelRegistrationFound)))
@@ -210,7 +215,7 @@ func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedM
if err != nil {
log.Tracef("error while forwarding message from peer [%s] to peer [%s]: %v", msg.Key, msg.RemoteKey, err)
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeError)))
return
return fmt.Errorf("error sending message to peer: %v", err)
}
s.metrics.MessageForwardLatency.Record(ctx, float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream)))
s.metrics.MessagesForwarded.Add(ctx, 1)
@@ -219,10 +224,13 @@ func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedM
case <-dstPeer.Stream.Context().Done():
log.Tracef("failed to forward message from peer [%s] to peer [%s]: destination peer disconnected", msg.Key, msg.RemoteKey)
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeDisconnected)))
return fmt.Errorf("destination peer disconnected")
case <-time.After(s.sendTimeout):
dstPeer.Cancel() // cancel the peer context to trigger deregistration
log.Tracef("failed to forward message from peer [%s] to peer [%s]: send timeout", msg.Key, msg.RemoteKey)
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeTimeout)))
return fmt.Errorf("sending message to peer timeout")
}
return nil
}