mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-20 15:49:55 +00:00
Enhance ICE handshake with session ID management and improve message handling
This commit is contained in:
@@ -45,19 +45,10 @@ type GrpcClient struct {
|
||||
connStateCallbackLock sync.RWMutex
|
||||
|
||||
onReconnectedListenerFn func()
|
||||
}
|
||||
|
||||
func (c *GrpcClient) StreamConnected() bool {
|
||||
return c.status == StreamConnected
|
||||
}
|
||||
|
||||
func (c *GrpcClient) GetStatus() Status {
|
||||
return c.status
|
||||
}
|
||||
|
||||
// Close Closes underlying connections to the Signal Exchange
|
||||
func (c *GrpcClient) Close() error {
|
||||
return c.signalConn.Close()
|
||||
decryptionWorker *Worker
|
||||
decryptionWorkerCancel context.CancelFunc
|
||||
decryptionWg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewClient creates a new Signal client
|
||||
@@ -93,6 +84,25 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *GrpcClient) StreamConnected() bool {
|
||||
return c.status == StreamConnected
|
||||
}
|
||||
|
||||
func (c *GrpcClient) GetStatus() Status {
|
||||
return c.status
|
||||
}
|
||||
|
||||
// Close Closes underlying connections to the Signal Exchange
|
||||
func (c *GrpcClient) Close() error {
|
||||
if c.decryptionWorkerCancel != nil {
|
||||
c.decryptionWorkerCancel()
|
||||
}
|
||||
c.decryptionWg.Wait()
|
||||
c.decryptionWorker = nil
|
||||
|
||||
return c.signalConn.Close()
|
||||
}
|
||||
|
||||
// SetConnStateListener set the ConnStateNotifier
|
||||
func (c *GrpcClient) SetConnStateListener(notifier ConnStateNotifier) {
|
||||
c.connStateCallbackLock.Lock()
|
||||
@@ -148,8 +158,12 @@ func (c *GrpcClient) Receive(ctx context.Context, msgHandler func(msg *proto.Mes
|
||||
|
||||
log.Infof("connected to the Signal Service stream")
|
||||
c.notifyConnected()
|
||||
|
||||
// Start worker pool if not already started
|
||||
c.startEncryptionWorker(ctx, msgHandler)
|
||||
|
||||
// start receiving messages from the Signal stream (from other peers through signal)
|
||||
err = c.receive(stream, msgHandler)
|
||||
err = c.receive(stream)
|
||||
if err != nil {
|
||||
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
|
||||
log.Debugf("signal connection context has been canceled, this usually indicates shutdown")
|
||||
@@ -174,6 +188,7 @@ func (c *GrpcClient) Receive(ctx context.Context, msgHandler func(msg *proto.Mes
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *GrpcClient) notifyStreamDisconnected() {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
@@ -382,11 +397,11 @@ func (c *GrpcClient) Send(msg *proto.Message) error {
|
||||
}
|
||||
|
||||
// receive receives messages from other peers coming through the Signal Exchange
|
||||
func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient,
|
||||
msgHandler func(msg *proto.Message) error) error {
|
||||
|
||||
// and distributes them to worker threads for processing
|
||||
func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) error {
|
||||
for {
|
||||
msg, err := stream.Recv()
|
||||
// Handle errors immediately
|
||||
switch s, ok := status.FromError(err); {
|
||||
case ok && s.Code() == codes.Canceled:
|
||||
log.Debugf("stream canceled (usually indicates shutdown)")
|
||||
@@ -398,24 +413,36 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient,
|
||||
log.Debugf("Signal Service stream closed by server")
|
||||
return err
|
||||
case err != nil:
|
||||
log.Errorf("Stream receive error: %v", err)
|
||||
return err
|
||||
}
|
||||
log.Tracef("received a new message from Peer [fingerprint: %s]", msg.Key)
|
||||
|
||||
decryptedMessage, err := c.decryptMessage(msg)
|
||||
if err != nil {
|
||||
log.Errorf("failed decrypting message of Peer [key: %s] error: [%s]", msg.Key, err.Error())
|
||||
if msg == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err = msgHandler(decryptedMessage)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("error while handling message of Peer [key: %s] error: [%s]", msg.Key, err.Error())
|
||||
// todo send something??
|
||||
if err := c.decryptionWorker.AddMsg(msg); err != nil {
|
||||
log.Errorf("failed to add message to decryption worker: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GrpcClient) startEncryptionWorker(ctx context.Context, handler func(msg *proto.Message) error) {
|
||||
if c.decryptionWorker != nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.decryptionWorker = NewWorker(c.decryptMessage, handler)
|
||||
c.decryptionWg.Add(1)
|
||||
go func() {
|
||||
workerCtx, workerCancel := context.WithCancel(ctx)
|
||||
defer workerCancel()
|
||||
|
||||
c.decryptionWorker.Work(workerCtx)
|
||||
c.decryptionWg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *GrpcClient) notifyDisconnected(err error) {
|
||||
c.connStateCallbackLock.RLock()
|
||||
defer c.connStateCallbackLock.RUnlock()
|
||||
|
||||
Reference in New Issue
Block a user