From 551455f3143a5fc5589f0436a6a7de460d998a4a Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Thu, 30 Mar 2023 14:31:52 +0200 Subject: [PATCH] Handle keep alive in signal server --- client/internal/engine.go | 1 - keepalive/client.go | 5 +++++ keepalive/keep_alive.go | 5 +++-- management/client/grpc.go | 9 +++------ signal/client/grpc.go | 16 ++++++++++++++-- 5 files changed, 25 insertions(+), 11 deletions(-) create mode 100644 keepalive/client.go diff --git a/client/internal/engine.go b/client/internal/engine.go index 9cab064b1..e7c8aa4df 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -888,7 +888,6 @@ func (e *Engine) receiveSignalEvents() { err := e.signal.Receive(func(msg *sProto.Message) error { e.syncMsgMux.Lock() defer e.syncMsgMux.Unlock() - conn := e.peerConns[msg.Key] if conn == nil { return fmt.Errorf("wrongly addressed message %s", msg.Key) diff --git a/keepalive/client.go b/keepalive/client.go new file mode 100644 index 000000000..1bfcf2b55 --- /dev/null +++ b/keepalive/client.go @@ -0,0 +1,5 @@ +package keepalive + +func IsKeepAliveMsg(body []byte) bool { + return len(body) == 0 +} diff --git a/keepalive/keep_alive.go b/keepalive/keep_alive.go index a1182bd45..c961dbe1d 100644 --- a/keepalive/keep_alive.go +++ b/keepalive/keep_alive.go @@ -11,8 +11,9 @@ import ( ) const ( + GrpcVersionHeaderKey = "version" + reversProxyHeaderKey = "x-netbird-peer" - grpcVersionHeaderKey = "version" keepAliveInterval = 30 * time.Second ) @@ -112,7 +113,7 @@ func (k *KeepAlive) keepAliveIsSupported(ctx context.Context) (string, bool) { return "", false } - if len(md.Get(grpcVersionHeaderKey)) == 0 { + if len(md.Get(GrpcVersionHeaderKey)) == 0 { log.Debugf("version info not found") return "", false } diff --git a/management/client/grpc.go b/management/client/grpc.go index 254c2693f..5c62f68a1 100644 --- a/management/client/grpc.go +++ b/management/client/grpc.go @@ -24,6 +24,7 @@ import ( "github.com/netbirdio/netbird/client/system" "github.com/netbirdio/netbird/encryption" + appKeepAlive "github.com/netbirdio/netbird/keepalive" "github.com/netbirdio/netbird/management/proto" "github.com/netbirdio/netbird/version" ) @@ -69,7 +70,7 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE realClient := proto.NewManagementServiceClient(conn) - md := metadata.Pairs("version", version.NetbirdVersion()) + md := metadata.Pairs(appKeepAlive.GrpcVersionHeaderKey, version.NetbirdVersion()) ctx = metadata.NewOutgoingContext(ctx, md) return &GrpcClient{ @@ -252,7 +253,7 @@ func (c *GrpcClient) receiveEvents(stream proto.ManagementService_SyncClient, se return err } - if c.isKeepAliveMsg(update.Body) { + if appKeepAlive.IsKeepAliveMsg(update.Body) { continue } @@ -396,10 +397,6 @@ func (c *GrpcClient) notifyConnected() { c.connStateCallback.MarkManagementConnected() } -func (c *GrpcClient) isKeepAliveMsg(body []byte) bool { - return len(body) == 0 -} - func infoToMetaData(info *system.Info) *proto.PeerSystemMeta { if info == nil { return nil diff --git a/signal/client/grpc.go b/signal/client/grpc.go index 08430e8ef..81bff7c2d 100644 --- a/signal/client/grpc.go +++ b/signal/client/grpc.go @@ -21,7 +21,9 @@ import ( "google.golang.org/grpc/status" "github.com/netbirdio/netbird/encryption" + appKeepAlive "github.com/netbirdio/netbird/keepalive" "github.com/netbirdio/netbird/signal/proto" + "github.com/netbirdio/netbird/version" ) const defaultSendTimeout = 5 * time.Second @@ -73,6 +75,7 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo sigCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() + conn, err := grpc.DialContext( sigCtx, addr, @@ -211,9 +214,12 @@ func (c *GrpcClient) getStreamStatusChan() <-chan struct{} { func (c *GrpcClient) connect(ctx context.Context, key string) (proto.SignalExchange_ConnectStreamClient, error) { c.stream = nil - // add key fingerprint to the request header to be identified on the server side - md := metadata.New(map[string]string{proto.HeaderId: key}) + md := metadata.New(map[string]string{ + proto.HeaderId: key, // add key fingerprint to the request header to be identified on the server side + appKeepAlive.GrpcVersionHeaderKey: version.NetbirdVersion(), // add version info to ensure keep alive is supported + }) metaCtx := metadata.NewOutgoingContext(ctx, md) + stream, err := c.realClient.ConnectStream(metaCtx, grpc.WaitForReady(true)) c.stream = stream if err != nil { @@ -366,6 +372,12 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient, } else if err != nil { return err } + + if appKeepAlive.IsKeepAliveMsg(msg.Body) { + log.Printf("received keepalive") + continue + } + log.Tracef("received a new message from Peer [fingerprint: %s]", msg.Key) decryptedMessage, err := c.decryptMessage(msg)