diff --git a/signal/cmd/run.go b/signal/cmd/run.go index 1d76fa4e4..dea90ddc3 100644 --- a/signal/cmd/run.go +++ b/signal/cmd/run.go @@ -8,9 +8,9 @@ import ( "fmt" "net" "net/http" - // nolint:gosec _ "net/http/pprof" + "os" "strings" "time" @@ -85,7 +85,9 @@ var ( RunE: func(cmd *cobra.Command, args []string) error { flag.Parse() - startPprof() + if os.Getenv("NB_PPROF_ENABLE") == "true" { + startPprof() + } opts, certManager, err := getTLSConfigurations() if err != nil { @@ -132,7 +134,7 @@ var ( log.Infof("running gRPC server: %s", grpcListener.Addr().String()) } - if signalPort != 10000 { + if signalPort != 10000 && os.Getenv("NB_DISABLE_FALLBACK_GRPC") != "true" { // The Signal gRPC server was running on port 10000 previously. Old agents that are already connected to Signal // are using port 10000. For compatibility purposes we keep running a 2nd gRPC server on port 10000. compatListener, err = serveGRPC(grpcServer, 10000) diff --git a/signal/server/signal.go b/signal/server/signal.go index aa92b53d3..47beda29b 100644 --- a/signal/server/signal.go +++ b/signal/server/signal.go @@ -101,7 +101,7 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto. return &proto.EncryptedMessage{}, nil } - return s.dispatcher.SendMessage(ctx, msg) + return s.dispatcher.SendMessage(ctx, msg, false) } // SendWithDeliveryCheck forwards a message to the signal peer with error handling @@ -115,11 +115,16 @@ func (s *Server) SendWithDeliveryCheck(ctx context.Context, msg *proto.Encrypted if _, found := s.registry.Get(msg.RemoteKey); found { // todo error handling here too - s.forwardMessageToPeer(ctx, msg) - return &emptypb.Empty{}, nil + err := s.forwardMessageToPeer(ctx, msg) + return &emptypb.Empty{}, err } - return nil, status.Errorf(codes.NotFound, "remote peer not connected") - //return s.dispatcher.SendMessage(ctx, msg) + + msg, err := s.dispatcher.SendMessage(ctx, msg, true) + if err != nil { + return nil, err + } + + return &emptypb.Empty{}, nil } // ConnectStream connects to the exchange stream @@ -177,7 +182,7 @@ func (s *Server) DeregisterPeer(p *peer.Peer) { s.registry.Deregister(p) } -func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedMessage) { +func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedMessage) error { log.Tracef("forwarding a new message from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey) getRegistrationStart := time.Now() @@ -189,7 +194,7 @@ func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedM s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotConnected))) log.Tracef("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey) // todo respond to the sender? - return + return fmt.Errorf("destination peer not connected") } s.metrics.GetRegistrationDelay.Record(ctx, float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream), attribute.String(labelRegistrationStatus, labelRegistrationFound))) @@ -210,7 +215,7 @@ func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedM if err != nil { log.Tracef("error while forwarding message from peer [%s] to peer [%s]: %v", msg.Key, msg.RemoteKey, err) s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeError))) - return + return fmt.Errorf("error sending message to peer: %v", err) } s.metrics.MessageForwardLatency.Record(ctx, float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream))) s.metrics.MessagesForwarded.Add(ctx, 1) @@ -219,10 +224,13 @@ func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedM case <-dstPeer.Stream.Context().Done(): log.Tracef("failed to forward message from peer [%s] to peer [%s]: destination peer disconnected", msg.Key, msg.RemoteKey) s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeDisconnected))) - + return fmt.Errorf("destination peer disconnected") case <-time.After(s.sendTimeout): dstPeer.Cancel() // cancel the peer context to trigger deregistration log.Tracef("failed to forward message from peer [%s] to peer [%s]: send timeout", msg.Key, msg.RemoteKey) s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeTimeout))) + return fmt.Errorf("sending message to peer timeout") } + + return nil }