Handle keep alive in signal server

This commit is contained in:
Zoltan Papp
2023-03-30 14:31:52 +02:00
parent a6431e053b
commit 551455f314
5 changed files with 25 additions and 11 deletions

View File

@@ -888,7 +888,6 @@ func (e *Engine) receiveSignalEvents() {
err := e.signal.Receive(func(msg *sProto.Message) error {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
conn := e.peerConns[msg.Key]
if conn == nil {
return fmt.Errorf("wrongly addressed message %s", msg.Key)

5
keepalive/client.go Normal file
View File

@@ -0,0 +1,5 @@
package keepalive
func IsKeepAliveMsg(body []byte) bool {
return len(body) == 0
}

View File

@@ -11,8 +11,9 @@ import (
)
const (
GrpcVersionHeaderKey = "version"
reversProxyHeaderKey = "x-netbird-peer"
grpcVersionHeaderKey = "version"
keepAliveInterval = 30 * time.Second
)
@@ -112,7 +113,7 @@ func (k *KeepAlive) keepAliveIsSupported(ctx context.Context) (string, bool) {
return "", false
}
if len(md.Get(grpcVersionHeaderKey)) == 0 {
if len(md.Get(GrpcVersionHeaderKey)) == 0 {
log.Debugf("version info not found")
return "", false
}

View File

@@ -24,6 +24,7 @@ import (
"github.com/netbirdio/netbird/client/system"
"github.com/netbirdio/netbird/encryption"
appKeepAlive "github.com/netbirdio/netbird/keepalive"
"github.com/netbirdio/netbird/management/proto"
"github.com/netbirdio/netbird/version"
)
@@ -69,7 +70,7 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE
realClient := proto.NewManagementServiceClient(conn)
md := metadata.Pairs("version", version.NetbirdVersion())
md := metadata.Pairs(appKeepAlive.GrpcVersionHeaderKey, version.NetbirdVersion())
ctx = metadata.NewOutgoingContext(ctx, md)
return &GrpcClient{
@@ -252,7 +253,7 @@ func (c *GrpcClient) receiveEvents(stream proto.ManagementService_SyncClient, se
return err
}
if c.isKeepAliveMsg(update.Body) {
if appKeepAlive.IsKeepAliveMsg(update.Body) {
continue
}
@@ -396,10 +397,6 @@ func (c *GrpcClient) notifyConnected() {
c.connStateCallback.MarkManagementConnected()
}
func (c *GrpcClient) isKeepAliveMsg(body []byte) bool {
return len(body) == 0
}
func infoToMetaData(info *system.Info) *proto.PeerSystemMeta {
if info == nil {
return nil

View File

@@ -21,7 +21,9 @@ import (
"google.golang.org/grpc/status"
"github.com/netbirdio/netbird/encryption"
appKeepAlive "github.com/netbirdio/netbird/keepalive"
"github.com/netbirdio/netbird/signal/proto"
"github.com/netbirdio/netbird/version"
)
const defaultSendTimeout = 5 * time.Second
@@ -73,6 +75,7 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
sigCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(
sigCtx,
addr,
@@ -211,9 +214,12 @@ func (c *GrpcClient) getStreamStatusChan() <-chan struct{} {
func (c *GrpcClient) connect(ctx context.Context, key string) (proto.SignalExchange_ConnectStreamClient, error) {
c.stream = nil
// add key fingerprint to the request header to be identified on the server side
md := metadata.New(map[string]string{proto.HeaderId: key})
md := metadata.New(map[string]string{
proto.HeaderId: key, // add key fingerprint to the request header to be identified on the server side
appKeepAlive.GrpcVersionHeaderKey: version.NetbirdVersion(), // add version info to ensure keep alive is supported
})
metaCtx := metadata.NewOutgoingContext(ctx, md)
stream, err := c.realClient.ConnectStream(metaCtx, grpc.WaitForReady(true))
c.stream = stream
if err != nil {
@@ -366,6 +372,12 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient,
} else if err != nil {
return err
}
if appKeepAlive.IsKeepAliveMsg(msg.Body) {
log.Printf("received keepalive")
continue
}
log.Tracef("received a new message from Peer [fingerprint: %s]", msg.Key)
decryptedMessage, err := c.decryptMessage(msg)