diff --git a/signal/server/signal.go b/signal/server/signal.go index fc9c19efd..02c49c31d 100644 --- a/signal/server/signal.go +++ b/signal/server/signal.go @@ -23,6 +23,8 @@ const ( labelTypeError = "error" labelTypeNotConnected = "not_connected" labelTypeNotRegistered = "not_registered" + labelTypeStream = "stream" + labelTypeMessage = "message" labelError = "error" labelErrorMissingId = "missing_id" @@ -62,6 +64,7 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto. } if dstPeer, found := s.registry.Get(msg.RemoteKey); found { + start := time.Now() //forward the message to the target peer if err := dstPeer.Stream.Send(msg); err != nil { log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", msg.Key, msg.RemoteKey, err) @@ -69,6 +72,7 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto. s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeError))) } else { + s.metrics.MessageForwardLatency.Record(ctx, float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeMessage))) s.metrics.MessagesForwarded.Add(context.Background(), 1) } } else { @@ -118,22 +122,21 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer) } else if err != nil { return err } - start := time.Now() log.Debugf("received a new message from peer [%s] to peer [%s]", p.Id, msg.RemoteKey) // lookup the target peer where the message is going to if dstPeer, found := s.registry.Get(msg.RemoteKey); found { + start := time.Now() //forward the message to the target peer if err := dstPeer.Stream.Send(msg); err != nil { log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", p.Id, msg.RemoteKey, err) //todo respond to the sender? - - // in milliseconds - s.metrics.MessageForwardLatency.Record(stream.Context(), float64(time.Since(start).Nanoseconds())/1e6) - s.metrics.MessagesForwarded.Add(stream.Context(), 1) - } else { s.metrics.MessageForwardFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelType, labelTypeError))) + } else { + // in milliseconds + s.metrics.MessageForwardLatency.Record(stream.Context(), float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream))) + s.metrics.MessagesForwarded.Add(stream.Context(), 1) } } else { log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey)