Merge branch 'main' of github.com:netbirdio/netbird into feat/local-user-totp

This commit is contained in:
jnfrati
2026-04-29 12:25:58 +02:00
36 changed files with 1591 additions and 1252 deletions

View File

@@ -146,7 +146,11 @@ func (c *ClaimsExtractor) ToGroups(token *jwt.Token, claimName string) []string
userJWTGroups := make([]string, 0)
if claim, ok := claims[claimName]; ok {
if claimGroups, ok := claim.([]interface{}); ok {
switch claimGroups := claim.(type) {
case string:
// Some IdPs emit a single group claim as a string instead of an array.
userJWTGroups = append(userJWTGroups, claimGroups)
case []any:
for _, g := range claimGroups {
if group, ok := g.(string); ok {
userJWTGroups = append(userJWTGroups, group)
@@ -154,9 +158,11 @@ func (c *ClaimsExtractor) ToGroups(token *jwt.Token, claimName string) []string
log.Debugf("JWT claim %q contains a non-string group (type: %T): %v", claimName, g, g)
}
}
default:
log.Debugf("JWT claim %q is not a string or string array (type: %T): %v", claimName, claim, claim)
}
} else {
log.Debugf("JWT claim %q is not a string array", claimName)
log.Debugf("JWT claim %q is missing", claimName)
}
return userJWTGroups

View File

@@ -249,6 +249,15 @@ func TestClaimsExtractor_ToGroups(t *testing.T) {
groupClaimName: "groups",
expectedGroups: []string{},
},
{
name: "extracts single group string from claim",
claims: jwt.MapClaims{
"sub": "user-123",
"groups": "admin",
},
groupClaimName: "groups",
expectedGroups: []string{"admin"},
},
{
name: "handles custom claim name",
claims: jwt.MapClaims{

View File

@@ -252,21 +252,19 @@ func (c *GrpcClient) handleJobStream(
c.notifyDisconnected(err)
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
case codes.Canceled:
log.Debugf("management connection context has been canceled, this usually indicates shutdown")
log.Debugf("job stream context has been canceled, this usually indicates shutdown")
return err
case codes.Unimplemented:
log.Warn("Job feature is not supported by the current management server version. " +
"Please update the management service to use this feature.")
return nil
default:
c.notifyDisconnected(err)
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
log.Warnf("job stream disconnected, will retry silently. Reason: %v", err)
return err
}
} else {
// non-gRPC error
c.notifyDisconnected(err)
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
log.Warnf("job stream disconnected, will retry silently. Reason: %v", err)
return err
}
}

View File

@@ -2921,6 +2921,7 @@ components:
- okta
- pocketid
- microsoft
- adfs
example: oidc
IdentityProvider:
type: object

View File

@@ -518,6 +518,7 @@ const (
IdentityProviderTypeOkta IdentityProviderType = "okta"
IdentityProviderTypePocketid IdentityProviderType = "pocketid"
IdentityProviderTypeZitadel IdentityProviderType = "zitadel"
IdentityProviderTypeAdfs IdentityProviderType = "adfs"
)
// Valid indicates whether the value is a known member of the IdentityProviderType enum.
@@ -537,6 +538,8 @@ func (e IdentityProviderType) Valid() bool {
return true
case IdentityProviderTypeZitadel:
return true
case IdentityProviderTypeAdfs:
return true
default:
return false
}

View File

@@ -8,10 +8,7 @@ import (
log "github.com/sirupsen/logrus"
)
const (
// TODO: make it configurable, the manager should validate all configurable parameters
reconnectingTimeout = 60 * time.Second
)
const defaultMaxBackoffInterval = 60 * time.Second
// Guard manage the reconnection tries to the Relay server in case of disconnection event.
type Guard struct {
@@ -19,14 +16,23 @@ type Guard struct {
OnNewRelayClient chan *Client
OnReconnected chan struct{}
serverPicker *ServerPicker
// maxBackoffInterval caps the exponential backoff between reconnect
// attempts.
maxBackoffInterval time.Duration
}
// NewGuard creates a new guard for the relay client.
func NewGuard(sp *ServerPicker) *Guard {
// NewGuard creates a new guard for the relay client. A non-positive
// maxBackoffInterval falls back to defaultMaxBackoffInterval.
func NewGuard(sp *ServerPicker, maxBackoffInterval time.Duration) *Guard {
if maxBackoffInterval <= 0 {
maxBackoffInterval = defaultMaxBackoffInterval
}
g := &Guard{
OnNewRelayClient: make(chan *Client, 1),
OnReconnected: make(chan struct{}, 1),
serverPicker: sp,
OnNewRelayClient: make(chan *Client, 1),
OnReconnected: make(chan struct{}, 1),
serverPicker: sp,
maxBackoffInterval: maxBackoffInterval,
}
return g
}
@@ -49,7 +55,7 @@ func (g *Guard) StartReconnectTrys(ctx context.Context, relayClient *Client) {
}
// start a ticker to pick a new server
ticker := exponentTicker(ctx)
ticker := g.exponentTicker(ctx)
defer ticker.Stop()
for {
@@ -125,11 +131,11 @@ func (g *Guard) notifyReconnected() {
}
}
func exponentTicker(ctx context.Context) *backoff.Ticker {
func (g *Guard) exponentTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 2 * time.Second,
Multiplier: 2,
MaxInterval: reconnectingTimeout,
MaxInterval: g.maxBackoffInterval,
Clock: backoff.SystemClock,
}, ctx)

View File

@@ -39,6 +39,15 @@ func NewRelayTrack() *RelayTrack {
type OnServerCloseListener func()
// ManagerOption configures a Manager at construction time.
type ManagerOption func(*Manager)
// WithMaxBackoffInterval caps the exponential backoff between reconnect
// attempts to the home relay. A non-positive value keeps the default.
func WithMaxBackoffInterval(d time.Duration) ManagerOption {
return func(m *Manager) { m.maxBackoffInterval = d }
}
// Manager is a manager for the relay client instances. It establishes one persistent connection to the given relay URL
// and automatically reconnect to them in case disconnection.
// The manager also manage temporary relay connection. If a client wants to communicate with a client on a
@@ -64,12 +73,13 @@ type Manager struct {
onReconnectedListenerFn func()
listenerLock sync.Mutex
mtu uint16
mtu uint16
maxBackoffInterval time.Duration
}
// NewManager creates a new manager instance.
// The serverURL address can be empty. In this case, the manager will not serve.
func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uint16) *Manager {
func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uint16, opts ...ManagerOption) *Manager {
tokenStore := &relayAuth.TokenStore{}
m := &Manager{
@@ -86,8 +96,11 @@ func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uin
relayClients: make(map[string]*RelayTrack),
onDisconnectedListeners: make(map[string]*list.List),
}
for _, opt := range opts {
opt(m)
}
m.serverPicker.ServerURLs.Store(serverURLs)
m.reconnectGuard = NewGuard(m.serverPicker)
m.reconnectGuard = NewGuard(m.serverPicker, m.maxBackoffInterval)
return m
}
@@ -290,19 +303,36 @@ func (m *Manager) onServerConnected() {
go m.onReconnectedListenerFn()
}
// onServerDisconnected start to reconnection for home server only
// onServerDisconnected handles relay disconnect events. For the home server it
// starts the reconnect guard. For foreign servers it evicts the now-dead client
// from the cache so the next OpenConn builds a fresh one instead of reusing a
// closed client.
func (m *Manager) onServerDisconnected(serverAddress string) {
m.relayClientMu.Lock()
if serverAddress == m.relayClient.connectionURL {
isHome := m.relayClient != nil && serverAddress == m.relayClient.connectionURL
if isHome {
go func(client *Client) {
m.reconnectGuard.StartReconnectTrys(m.ctx, client)
}(m.relayClient)
}
m.relayClientMu.Unlock()
if !isHome {
m.evictForeignRelay(serverAddress)
}
m.notifyOnDisconnectListeners(serverAddress)
}
func (m *Manager) evictForeignRelay(serverAddress string) {
m.relayClientsMutex.Lock()
defer m.relayClientsMutex.Unlock()
if _, ok := m.relayClients[serverAddress]; ok {
delete(m.relayClients, serverAddress)
log.Debugf("evicted disconnected foreign relay client: %s", serverAddress)
}
}
func (m *Manager) listenGuardEvent(ctx context.Context) {
for {
select {

View File

@@ -2,6 +2,7 @@ package client
import (
"context"
"fmt"
"testing"
"time"
@@ -360,7 +361,8 @@ func TestAutoReconnect(t *testing.T) {
t.Fatalf("failed to serve manager: %s", err)
}
clientAlice := NewManager(mCtx, toURL(srvCfg), "alice", iface.DefaultMTU)
clientAlice := NewManager(mCtx, toURL(srvCfg), "alice", iface.DefaultMTU,
WithMaxBackoffInterval(2*time.Second))
err = clientAlice.Serve()
if err != nil {
t.Fatalf("failed to serve manager: %s", err)
@@ -384,7 +386,9 @@ func TestAutoReconnect(t *testing.T) {
}
log.Infof("waiting for reconnection")
time.Sleep(reconnectingTimeout + 1*time.Second)
if err := waitForReady(ctx, clientAlice, 15*time.Second); err != nil {
t.Fatalf("manager did not reconnect: %s", err)
}
log.Infof("reopent the connection")
_, err = clientAlice.OpenConn(ctx, ra, "bob")
@@ -393,6 +397,21 @@ func TestAutoReconnect(t *testing.T) {
}
}
func waitForReady(ctx context.Context, m *Manager, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if m.Ready() {
return nil
}
select {
case <-time.After(100 * time.Millisecond):
case <-ctx.Done():
return ctx.Err()
}
}
return fmt.Errorf("manager not ready within %s", timeout)
}
func TestNotifierDoubleAdd(t *testing.T) {
ctx := context.Background()