diff --git a/client/cmd/service.go b/client/cmd/service.go index 855eb30fa..3560088a7 100644 --- a/client/cmd/service.go +++ b/client/cmd/service.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "sync" "github.com/kardianos/service" log "github.com/sirupsen/logrus" @@ -13,10 +14,11 @@ import ( ) type program struct { - ctx context.Context - cancel context.CancelFunc - serv *grpc.Server - serverInstance *server.Server + ctx context.Context + cancel context.CancelFunc + serv *grpc.Server + serverInstance *server.Server + serverInstanceMu sync.Mutex } func newProgram(ctx context.Context, cancel context.CancelFunc) *program { diff --git a/client/cmd/service_controller.go b/client/cmd/service_controller.go index 86546e31c..761c86628 100644 --- a/client/cmd/service_controller.go +++ b/client/cmd/service_controller.go @@ -61,7 +61,9 @@ func (p *program) Start(svc service.Service) error { } proto.RegisterDaemonServiceServer(p.serv, serverInstance) + p.serverInstanceMu.Lock() p.serverInstance = serverInstance + p.serverInstanceMu.Unlock() log.Printf("started daemon server: %v", split[1]) if err := p.serv.Serve(listen); err != nil { @@ -72,6 +74,7 @@ func (p *program) Start(svc service.Service) error { } func (p *program) Stop(srv service.Service) error { + p.serverInstanceMu.Lock() if p.serverInstance != nil { in := new(proto.DownRequest) _, err := p.serverInstance.Down(p.ctx, in) @@ -79,6 +82,7 @@ func (p *program) Stop(srv service.Service) error { log.Errorf("failed to stop daemon: %v", err) } } + p.serverInstanceMu.Unlock() p.cancel() diff --git a/client/iface/wgproxy/bind/proxy.go b/client/iface/wgproxy/bind/proxy.go index e0883715a..8a2e65382 100644 --- a/client/iface/wgproxy/bind/proxy.go +++ b/client/iface/wgproxy/bind/proxy.go @@ -2,6 +2,7 @@ package bind import ( "context" + "errors" "fmt" "net" "net/netip" @@ -94,7 +95,10 @@ func (p *ProxyBind) close() error { p.Bind.RemoveEndpoint(p.wgAddr) - return p.remoteConn.Close() + if rErr := p.remoteConn.Close(); rErr != nil && !errors.Is(rErr, net.ErrClosed) { + return rErr + } + return nil } func (p *ProxyBind) proxyToLocal(ctx context.Context) { diff --git a/client/iface/wgproxy/ebpf/wrapper.go b/client/iface/wgproxy/ebpf/wrapper.go index efd5fd946..54cab4e1b 100644 --- a/client/iface/wgproxy/ebpf/wrapper.go +++ b/client/iface/wgproxy/ebpf/wrapper.go @@ -77,7 +77,7 @@ func (e *ProxyWrapper) CloseConn() error { e.cancel() - if err := e.remoteConn.Close(); err != nil { + if err := e.remoteConn.Close(); err != nil && !errors.Is(err, net.ErrClosed) { return fmt.Errorf("failed to close remote conn: %w", err) } return nil diff --git a/client/iface/wgproxy/udp/proxy.go b/client/iface/wgproxy/udp/proxy.go index 200d961f3..ba0004b8a 100644 --- a/client/iface/wgproxy/udp/proxy.go +++ b/client/iface/wgproxy/udp/proxy.go @@ -116,7 +116,7 @@ func (p *WGUDPProxy) close() error { p.cancel() var result *multierror.Error - if err := p.remoteConn.Close(); err != nil { + if err := p.remoteConn.Close(); err != nil && !errors.Is(err, net.ErrClosed) { result = multierror.Append(result, fmt.Errorf("remote conn: %s", err)) } diff --git a/client/internal/connect.go b/client/internal/connect.go index bcc9d17a3..dff44f1d2 100644 --- a/client/internal/connect.go +++ b/client/internal/connect.go @@ -207,7 +207,8 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHold c.statusRecorder.MarkSignalDisconnected(nil) defer func() { - c.statusRecorder.MarkSignalDisconnected(state.err) + _, err := state.Status() + c.statusRecorder.MarkSignalDisconnected(err) }() // with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 84a8c221f..81c456db7 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -442,7 +442,7 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { if conn.iceP2PIsActive() { conn.log.Debugf("do not switch to relay because current priority is: %v", conn.currentConnPriority) - conn.wgProxyRelay = wgProxy + conn.setRelayedProxy(wgProxy) conn.statusRelay.Set(StatusConnected) conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey) return @@ -465,7 +465,7 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { wgConfigWorkaround() conn.currentConnPriority = connPriorityRelay conn.statusRelay.Set(StatusConnected) - conn.wgProxyRelay = wgProxy + conn.setRelayedProxy(wgProxy) conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey) conn.log.Infof("start to communicate with peer via relay") conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr) @@ -736,6 +736,15 @@ func (conn *Conn) logTraceConnState() { } } +func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) { + if conn.wgProxyRelay != nil { + if err := conn.wgProxyRelay.CloseConn(); err != nil { + conn.log.Warnf("failed to close deprecated wg proxy conn: %v", err) + } + } + conn.wgProxyRelay = proxy +} + func isController(config ConnConfig) bool { return config.LocalKey > config.Key } diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index 241dfabbb..0444dc60b 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -67,7 +67,7 @@ func (s *State) DeleteRoute(network string) { func (s *State) GetRoutes() map[string]struct{} { s.Mux.RLock() defer s.Mux.RUnlock() - return s.routes + return maps.Clone(s.routes) } // LocalPeerState contains the latest state of the local peer diff --git a/management/server/account.go b/management/server/account.go index 62e23beec..1dd5ef840 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -2034,7 +2034,7 @@ func (am *DefaultAccountManager) syncJWTGroups(ctx context.Context, accountID st return fmt.Errorf("error getting user: %w", err) } - groups, err := transaction.GetAccountGroups(ctx, accountID) + groups, err := transaction.GetAccountGroups(ctx, LockingStrengthShare, accountID) if err != nil { return fmt.Errorf("error getting account groups: %w", err) } @@ -2064,7 +2064,7 @@ func (am *DefaultAccountManager) syncJWTGroups(ctx context.Context, accountID st // Propagate changes to peers if group propagation is enabled if settings.GroupsPropagationEnabled { - groups, err = transaction.GetAccountGroups(ctx, accountID) + groups, err = transaction.GetAccountGroups(ctx, LockingStrengthShare, accountID) if err != nil { return fmt.Errorf("error getting account groups: %w", err) } @@ -2134,7 +2134,7 @@ func (am *DefaultAccountManager) syncJWTGroups(ctx context.Context, accountID st if settings.GroupsPropagationEnabled { account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID) if err != nil { - return fmt.Errorf("error getting account: %w", err) + return status.NewGetAccountError(err) } if areGroupChangesAffectPeers(account, addNewGroups) || areGroupChangesAffectPeers(account, removeOldGroups) { @@ -2295,12 +2295,12 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID account, err := am.Store.GetAccount(ctx, accountID) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, status.NewGetAccountError(err) } peer, netMap, postureChecks, err := am.SyncPeer(ctx, PeerSync{WireGuardPubKey: peerPubKey, Meta: meta}, account) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, fmt.Errorf("error syncing peer: %w", err) } err = am.MarkPeerConnected(ctx, peerPubKey, true, realIP, account) @@ -2319,7 +2319,7 @@ func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, account account, err := am.Store.GetAccount(ctx, accountID) if err != nil { - return err + return status.NewGetAccountError(err) } err = am.MarkPeerConnected(ctx, peerPubKey, false, nil, account) diff --git a/management/server/account_test.go b/management/server/account_test.go index 6a2d85fe8..fdf004a3b 100644 --- a/management/server/account_test.go +++ b/management/server/account_test.go @@ -2773,7 +2773,7 @@ func TestAccount_SetJWTGroups(t *testing.T) { err = manager.syncJWTGroups(context.Background(), "accountID", claims) assert.NoError(t, err, "unable to sync jwt groups") - groups, err := manager.Store.GetAccountGroups(context.Background(), "accountID") + groups, err := manager.Store.GetAccountGroups(context.Background(), LockingStrengthShare, "accountID") assert.NoError(t, err) assert.Len(t, groups, 3, "new group3 should be added") diff --git a/management/server/group.go b/management/server/group.go index bdb569e37..b2ec88cc0 100644 --- a/management/server/group.go +++ b/management/server/group.go @@ -59,7 +59,7 @@ func (am *DefaultAccountManager) GetAllGroups(ctx context.Context, accountID, us return nil, err } - return am.Store.GetAccountGroups(ctx, accountID) + return am.Store.GetAccountGroups(ctx, LockingStrengthShare, accountID) } // GetGroupByName filters all groups in an account by name and returns the one with the most peers diff --git a/management/server/group/group.go b/management/server/group/group.go index d293e1afc..e98e5ecc4 100644 --- a/management/server/group/group.go +++ b/management/server/group/group.go @@ -49,3 +49,8 @@ func (g *Group) Copy() *Group { func (g *Group) HasPeers() bool { return len(g.Peers) > 0 } + +// IsGroupAll checks if the group is a default "All" group. +func (g *Group) IsGroupAll() bool { + return g.Name == "All" +} diff --git a/management/server/grpcserver.go b/management/server/grpcserver.go index efe088b27..9c12336f8 100644 --- a/management/server/grpcserver.go +++ b/management/server/grpcserver.go @@ -180,6 +180,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), extractPeerMeta(ctx, syncReq.GetMeta()), realIP) if err != nil { + log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err) return mapError(ctx, err) } @@ -207,6 +208,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi // handleUpdates sends updates to the connected peer until the updates channel is closed. func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *UpdateMessage, srv proto.ManagementService_SyncServer) error { + log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String()) for { select { // condition when there are some updates @@ -260,10 +262,15 @@ func (s *GRPCServer) cancelPeerRoutines(ctx context.Context, accountID string, p unlock := s.acquirePeerLockByUID(ctx, peer.Key) defer unlock() - _ = s.accountManager.OnPeerDisconnected(ctx, accountID, peer.Key) + err := s.accountManager.OnPeerDisconnected(ctx, accountID, peer.Key) + if err != nil { + log.WithContext(ctx).Errorf("failed to disconnect peer %s properly: %v", peer.Key, err) + } s.peersUpdateManager.CloseChannel(ctx, peer.ID) s.secretsManager.CancelRefresh(peer.ID) s.ephemeralManager.OnPeerDisconnected(ctx, peer) + + log.WithContext(ctx).Tracef("peer %s has been disconnected", peer.Key) } func (s *GRPCServer) validateToken(ctx context.Context, jwtToken string) (string, error) { diff --git a/management/server/peer.go b/management/server/peer.go index 9c5ab571b..9784650de 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -110,14 +110,16 @@ func (am *DefaultAccountManager) GetPeers(ctx context.Context, accountID, userID func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, connected bool, realIP net.IP, account *Account) error { peer, err := account.FindPeerByPubKey(peerPubKey) if err != nil { - return err + return fmt.Errorf("failed to find peer by pub key: %w", err) } expired, err := am.updatePeerStatusAndLocation(ctx, peer, connected, realIP, account) if err != nil { - return err + return fmt.Errorf("failed to update peer status and location: %w", err) } + log.WithContext(ctx).Debugf("mark peer %s connected: %t", peer.ID, connected) + if peer.AddedWithSSOLogin() { if peer.LoginExpirationEnabled && account.Settings.PeerLoginExpirationEnabled { am.checkAndSchedulePeerLoginExpiration(ctx, account) @@ -168,7 +170,7 @@ func (am *DefaultAccountManager) updatePeerStatusAndLocation(ctx context.Context err := am.Store.SavePeerStatus(account.Id, peer.ID, *newStatus) if err != nil { - return false, err + return false, fmt.Errorf("failed to save peer status: %w", err) } return oldStatus.LoginExpired, nil @@ -587,7 +589,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID) if err != nil { - return nil, nil, nil, fmt.Errorf("error getting account: %w", err) + return nil, nil, nil, status.NewGetAccountError(err) } allGroup, err := account.GetGroupAll() @@ -640,7 +642,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync PeerSync, ac if peer.UserID != "" { user, err := account.FindUser(peer.UserID) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, fmt.Errorf("failed to get user: %w", err) } err = checkIfPeerOwnerIsBlocked(peer, user) @@ -657,7 +659,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync PeerSync, ac if updated { err = am.Store.SavePeer(ctx, account.Id, peer) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, fmt.Errorf("failed to save peer: %w", err) } if sync.UpdateAccountPeers { @@ -667,7 +669,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync PeerSync, ac peerNotValid, isStatusChanged, err := am.integratedPeerValidator.IsNotValidPeer(ctx, account.Id, peer, account.GetPeerGroupsList(peer.ID), account.Settings.Extra) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, fmt.Errorf("failed to validate peer: %w", err) } var postureChecks []*posture.Checks @@ -685,7 +687,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync PeerSync, ac validPeersMap, err := am.GetValidatedPeers(account) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, fmt.Errorf("failed to get validated peers: %w", err) } postureChecks = am.getPeerPostureChecks(account, peer) @@ -765,7 +767,7 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login PeerLogin) } } - groups, err := am.Store.GetAccountGroups(ctx, accountID) + groups, err := am.Store.GetAccountGroups(ctx, LockingStrengthShare, accountID) if err != nil { return nil, nil, nil, err } diff --git a/management/server/setupkey.go b/management/server/setupkey.go index 43b6e02c9..554c66ba4 100644 --- a/management/server/setupkey.go +++ b/management/server/setupkey.go @@ -4,18 +4,17 @@ import ( "context" "crypto/sha256" b64 "encoding/base64" - "fmt" "hash/fnv" + "slices" "strconv" "strings" "time" "unicode/utf8" "github.com/google/uuid" - log "github.com/sirupsen/logrus" - "github.com/netbirdio/netbird/management/server/activity" "github.com/netbirdio/netbird/management/server/status" + log "github.com/sirupsen/logrus" ) const ( @@ -229,32 +228,43 @@ func (am *DefaultAccountManager) CreateSetupKey(ctx context.Context, accountID s unlock := am.Store.AcquireWriteLockByUID(ctx, accountID) defer unlock() - account, err := am.Store.GetAccount(ctx, accountID) + user, err := am.Store.GetUserByUserID(ctx, LockingStrengthShare, userID) if err != nil { return nil, err } - if err := validateSetupKeyAutoGroups(account, autoGroups); err != nil { - return nil, err + if user.AccountID != accountID { + return nil, status.NewUserNotPartOfAccountError() } - setupKey, plainKey := GenerateSetupKey(keyName, keyType, expiresIn, autoGroups, usageLimit, ephemeral) - account.SetupKeys[setupKey.Key] = setupKey - err = am.Store.SaveAccount(ctx, account) + if user.IsRegularUser() { + return nil, status.NewAdminPermissionError() + } + + var setupKey *SetupKey + var plainKey string + var eventsToStore []func() + + err = am.Store.ExecuteInTransaction(ctx, func(transaction Store) error { + if err = validateSetupKeyAutoGroups(ctx, transaction, accountID, autoGroups); err != nil { + return err + } + + setupKey, plainKey = GenerateSetupKey(keyName, keyType, expiresIn, autoGroups, usageLimit, ephemeral) + setupKey.AccountID = accountID + + events := am.prepareSetupKeyEvents(ctx, transaction, accountID, userID, autoGroups, nil, setupKey) + eventsToStore = append(eventsToStore, events...) + + return transaction.SaveSetupKey(ctx, LockingStrengthUpdate, setupKey) + }) if err != nil { - return nil, status.Errorf(status.Internal, "failed adding account key") + return nil, err } am.StoreEvent(ctx, userID, setupKey.Id, accountID, activity.SetupKeyCreated, setupKey.EventMeta()) - - for _, g := range setupKey.AutoGroups { - group := account.GetGroup(g) - if group != nil { - am.StoreEvent(ctx, userID, setupKey.Id, accountID, activity.GroupAddedToSetupKey, - map[string]any{"group": group.Name, "group_id": group.ID, "setupkey": setupKey.Name}) - } else { - log.WithContext(ctx).Errorf("group %s not found while saving setup key activity event of account %s", g, account.Id) - } + for _, storeEvent := range eventsToStore { + storeEvent() } // for the creation return the plain key to the caller @@ -268,43 +278,56 @@ func (am *DefaultAccountManager) CreateSetupKey(ctx context.Context, accountID s // (e.g. the key itself, creation date, ID, etc). // These properties are overwritten: Name, AutoGroups, Revoked. The rest is copied from the existing key. func (am *DefaultAccountManager) SaveSetupKey(ctx context.Context, accountID string, keyToSave *SetupKey, userID string) (*SetupKey, error) { - unlock := am.Store.AcquireWriteLockByUID(ctx, accountID) - defer unlock() - if keyToSave == nil { return nil, status.Errorf(status.InvalidArgument, "provided setup key to update is nil") } - account, err := am.Store.GetAccount(ctx, accountID) + unlock := am.Store.AcquireWriteLockByUID(ctx, accountID) + defer unlock() + + user, err := am.Store.GetUserByUserID(ctx, LockingStrengthShare, userID) if err != nil { return nil, err } + if user.AccountID != accountID { + return nil, status.NewUserNotPartOfAccountError() + } + + if user.IsRegularUser() { + return nil, status.NewAdminPermissionError() + } + var oldKey *SetupKey - for _, key := range account.SetupKeys { - if key.Id == keyToSave.Id { - oldKey = key.Copy() - break + var newKey *SetupKey + var eventsToStore []func() + + err = am.Store.ExecuteInTransaction(ctx, func(transaction Store) error { + if err = validateSetupKeyAutoGroups(ctx, transaction, accountID, keyToSave.AutoGroups); err != nil { + return err } - } - if oldKey == nil { - return nil, status.Errorf(status.NotFound, "setup key not found") - } - if err := validateSetupKeyAutoGroups(account, keyToSave.AutoGroups); err != nil { - return nil, err - } + oldKey, err = transaction.GetSetupKeyByID(ctx, LockingStrengthShare, accountID, keyToSave.Id) + if err != nil { + return err + } - // only auto groups, revoked status, and name can be updated for now - newKey := oldKey.Copy() - newKey.Name = keyToSave.Name - newKey.AutoGroups = keyToSave.AutoGroups - newKey.Revoked = keyToSave.Revoked - newKey.UpdatedAt = time.Now().UTC() + // only auto groups, revoked status, and name can be updated for now + newKey = oldKey.Copy() + newKey.Name = keyToSave.Name + newKey.AutoGroups = keyToSave.AutoGroups + newKey.Revoked = keyToSave.Revoked + newKey.UpdatedAt = time.Now().UTC() - account.SetupKeys[newKey.Key] = newKey + addedGroups := difference(newKey.AutoGroups, oldKey.AutoGroups) + removedGroups := difference(oldKey.AutoGroups, newKey.AutoGroups) - if err = am.Store.SaveAccount(ctx, account); err != nil { + events := am.prepareSetupKeyEvents(ctx, transaction, accountID, userID, addedGroups, removedGroups, oldKey) + eventsToStore = append(eventsToStore, events...) + + return transaction.SaveSetupKey(ctx, LockingStrengthUpdate, newKey) + }) + if err != nil { return nil, err } @@ -312,30 +335,9 @@ func (am *DefaultAccountManager) SaveSetupKey(ctx context.Context, accountID str am.StoreEvent(ctx, userID, newKey.Id, accountID, activity.SetupKeyRevoked, newKey.EventMeta()) } - defer func() { - addedGroups := difference(newKey.AutoGroups, oldKey.AutoGroups) - removedGroups := difference(oldKey.AutoGroups, newKey.AutoGroups) - for _, g := range removedGroups { - group := account.GetGroup(g) - if group != nil { - am.StoreEvent(ctx, userID, oldKey.Id, accountID, activity.GroupRemovedFromSetupKey, - map[string]any{"group": group.Name, "group_id": group.ID, "setupkey": newKey.Name}) - } else { - log.WithContext(ctx).Errorf("group %s not found while saving setup key activity event of account %s", g, account.Id) - } - - } - - for _, g := range addedGroups { - group := account.GetGroup(g) - if group != nil { - am.StoreEvent(ctx, userID, oldKey.Id, accountID, activity.GroupAddedToSetupKey, - map[string]any{"group": group.Name, "group_id": group.ID, "setupkey": newKey.Name}) - } else { - log.WithContext(ctx).Errorf("group %s not found while saving setup key activity event of account %s", g, account.Id) - } - } - }() + for _, storeEvent := range eventsToStore { + storeEvent() + } return newKey, nil } @@ -347,16 +349,15 @@ func (am *DefaultAccountManager) ListSetupKeys(ctx context.Context, accountID, u return nil, err } - if !user.IsAdminOrServiceUser() || user.AccountID != accountID { - return nil, status.NewUnauthorizedToViewSetupKeysError() + if user.AccountID != accountID { + return nil, status.NewUserNotPartOfAccountError() } - setupKeys, err := am.Store.GetAccountSetupKeys(ctx, LockingStrengthShare, accountID) - if err != nil { - return nil, err + if user.IsRegularUser() { + return nil, status.NewAdminPermissionError() } - return setupKeys, nil + return am.Store.GetAccountSetupKeys(ctx, LockingStrengthShare, accountID) } // GetSetupKey looks up a SetupKey by KeyID, returns NotFound error if not found. @@ -366,8 +367,12 @@ func (am *DefaultAccountManager) GetSetupKey(ctx context.Context, accountID, use return nil, err } - if !user.IsAdminOrServiceUser() || user.AccountID != accountID { - return nil, status.NewUnauthorizedToViewSetupKeysError() + if user.AccountID != accountID { + return nil, status.NewUserNotPartOfAccountError() + } + + if user.IsRegularUser() { + return nil, status.NewAdminPermissionError() } setupKey, err := am.Store.GetSetupKeyByID(ctx, LockingStrengthShare, keyID, accountID) @@ -387,21 +392,29 @@ func (am *DefaultAccountManager) GetSetupKey(ctx context.Context, accountID, use func (am *DefaultAccountManager) DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error { user, err := am.Store.GetUserByUserID(ctx, LockingStrengthShare, userID) if err != nil { - return fmt.Errorf("failed to get user: %w", err) + return err } - if !user.IsAdminOrServiceUser() || user.AccountID != accountID { - return status.NewUnauthorizedToViewSetupKeysError() + if user.AccountID != accountID { + return status.NewUserNotPartOfAccountError() } - deletedSetupKey, err := am.Store.GetSetupKeyByID(ctx, LockingStrengthShare, keyID, accountID) + if user.IsRegularUser() { + return status.NewAdminPermissionError() + } + + var deletedSetupKey *SetupKey + + err = am.Store.ExecuteInTransaction(ctx, func(transaction Store) error { + deletedSetupKey, err = transaction.GetSetupKeyByID(ctx, LockingStrengthShare, accountID, keyID) + if err != nil { + return err + } + + return transaction.DeleteSetupKey(ctx, LockingStrengthUpdate, accountID, keyID) + }) if err != nil { - return fmt.Errorf("failed to get setup key: %w", err) - } - - err = am.Store.DeleteSetupKey(ctx, accountID, keyID) - if err != nil { - return fmt.Errorf("failed to delete setup key: %w", err) + return err } am.StoreEvent(ctx, userID, keyID, accountID, activity.SetupKeyDeleted, deletedSetupKey.EventMeta()) @@ -409,15 +422,62 @@ func (am *DefaultAccountManager) DeleteSetupKey(ctx context.Context, accountID, return nil } -func validateSetupKeyAutoGroups(account *Account, autoGroups []string) error { - for _, group := range autoGroups { - g, ok := account.Groups[group] +func validateSetupKeyAutoGroups(ctx context.Context, transaction Store, accountID string, autoGroupIDs []string) error { + groups, err := transaction.GetGroupsByIDs(ctx, LockingStrengthShare, accountID, autoGroupIDs) + if err != nil { + return err + } + + for _, groupID := range autoGroupIDs { + group, ok := groups[groupID] if !ok { - return status.Errorf(status.NotFound, "group %s doesn't exist", group) + return status.Errorf(status.NotFound, "group not found: %s", groupID) } - if g.Name == "All" { - return status.Errorf(status.InvalidArgument, "can't add All group to the setup key") + + if group.IsGroupAll() { + return status.Errorf(status.InvalidArgument, "can't add 'All' group to the setup key") } } + return nil } + +// prepareSetupKeyEvents prepares a list of event functions to be stored. +func (am *DefaultAccountManager) prepareSetupKeyEvents(ctx context.Context, transaction Store, accountID, userID string, addedGroups, removedGroups []string, key *SetupKey) []func() { + var eventsToStore []func() + + modifiedGroups := slices.Concat(addedGroups, removedGroups) + groups, err := transaction.GetGroupsByIDs(ctx, LockingStrengthShare, accountID, modifiedGroups) + if err != nil { + log.WithContext(ctx).Errorf("issue getting groups for setup key events: %v", err) + return nil + } + + for _, g := range removedGroups { + group, ok := groups[g] + if !ok { + log.WithContext(ctx).Debugf("skipped adding group: %s GroupRemovedFromSetupKey activity: %v", g, err) + continue + } + + eventsToStore = append(eventsToStore, func() { + meta := map[string]any{"group": group.Name, "group_id": group.ID, "setupkey": key.Name} + am.StoreEvent(ctx, userID, key.Id, accountID, activity.GroupRemovedFromSetupKey, meta) + }) + } + + for _, g := range addedGroups { + group, ok := groups[g] + if !ok { + log.WithContext(ctx).Debugf("skipped adding group: %s GroupAddedToSetupKey activity: %v", g, err) + continue + } + + eventsToStore = append(eventsToStore, func() { + meta := map[string]any{"group": group.Name, "group_id": group.ID, "setupkey": key.Name} + am.StoreEvent(ctx, userID, key.Id, accountID, activity.GroupAddedToSetupKey, meta) + }) + } + + return eventsToStore +} diff --git a/management/server/sql_store.go b/management/server/sql_store.go index 96d9f27a5..11ce3a086 100644 --- a/management/server/sql_store.go +++ b/management/server/sql_store.go @@ -503,9 +503,10 @@ func (s *SqlStore) GetAccountBySetupKey(ctx context.Context, setupKey string) (* if result.Error != nil { if errors.Is(result.Error, gorm.ErrRecordNotFound) { - return nil, status.Errorf(status.NotFound, "account not found: index lookup failed") + return nil, status.NewSetupKeyNotFoundError(setupKey) } - return nil, status.NewSetupKeyNotFoundError(result.Error) + log.WithContext(ctx).Errorf("failed to get account by setup key from store: %v", result.Error) + return nil, status.Errorf(status.Internal, "failed to get account by setup key from store") } if key.AccountID == "" { @@ -586,15 +587,15 @@ func (s *SqlStore) GetAccountUsers(ctx context.Context, accountID string) ([]*Us return users, nil } -func (s *SqlStore) GetAccountGroups(ctx context.Context, accountID string) ([]*nbgroup.Group, error) { +func (s *SqlStore) GetAccountGroups(ctx context.Context, lockStrength LockingStrength, accountID string) ([]*nbgroup.Group, error) { var groups []*nbgroup.Group - result := s.db.Find(&groups, accountIDCondition, accountID) + result := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}).Find(&groups, accountIDCondition, accountID) if result.Error != nil { if errors.Is(result.Error, gorm.ErrRecordNotFound) { return nil, status.Errorf(status.NotFound, "accountID not found: index lookup failed") } - log.WithContext(ctx).Errorf("error when getting groups from the store: %s", result.Error) - return nil, status.Errorf(status.Internal, "issue getting groups from store") + log.WithContext(ctx).Errorf("failed to get account groups from the store: %s", result.Error) + return nil, status.Errorf(status.Internal, "failed to get account groups from the store") } return groups, nil @@ -775,9 +776,10 @@ func (s *SqlStore) GetAccountIDBySetupKey(ctx context.Context, setupKey string) result := s.db.Model(&SetupKey{}).Select("account_id").Where(GetKeyQueryCondition(s), setupKey).First(&accountID) if result.Error != nil { if errors.Is(result.Error, gorm.ErrRecordNotFound) { - return "", status.Errorf(status.NotFound, "account not found: index lookup failed") + return "", status.NewSetupKeyNotFoundError(setupKey) } - return "", status.NewSetupKeyNotFoundError(result.Error) + log.WithContext(ctx).Errorf("failed to get account ID by setup key from store: %v", result.Error) + return "", status.Errorf(status.Internal, "failed to get account ID by setup key from store") } if accountID == "" { @@ -1049,9 +1051,10 @@ func (s *SqlStore) GetSetupKeyBySecret(ctx context.Context, lockStrength Locking if result.Error != nil { if errors.Is(result.Error, gorm.ErrRecordNotFound) { - return nil, status.Errorf(status.NotFound, "setup key not found") + return nil, status.NewSetupKeyNotFoundError(key) } - return nil, status.NewSetupKeyNotFoundError(result.Error) + log.WithContext(ctx).Errorf("failed to get setup key by secret from store: %v", result.Error) + return nil, status.Errorf(status.Internal, "failed to get setup key by secret from store") } return &setupKey, nil } @@ -1069,7 +1072,7 @@ func (s *SqlStore) IncrementSetupKeyUsage(ctx context.Context, setupKeyID string } if result.RowsAffected == 0 { - return status.Errorf(status.NotFound, "setup key not found") + return status.NewSetupKeyNotFoundError(setupKeyID) } return nil @@ -1247,6 +1250,23 @@ func (s *SqlStore) GetGroupByName(ctx context.Context, lockStrength LockingStren return &group, nil } +// GetGroupsByIDs retrieves groups by their IDs and account ID. +func (s *SqlStore) GetGroupsByIDs(ctx context.Context, lockStrength LockingStrength, accountID string, groupIDs []string) (map[string]*nbgroup.Group, error) { + var groups []*nbgroup.Group + result := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}).Find(&groups, "account_id = ? AND id in ?", accountID, groupIDs) + if result.Error != nil { + log.WithContext(ctx).Errorf("failed to get groups by ID's from the store: %s", result.Error) + return nil, status.Errorf(status.Internal, "failed to get groups by ID's from the store") + } + + groupsMap := make(map[string]*nbgroup.Group) + for _, group := range groups { + groupsMap[group.ID] = group + } + + return groupsMap, nil +} + // SaveGroup saves a group to the store. func (s *SqlStore) SaveGroup(ctx context.Context, lockStrength LockingStrength, group *nbgroup.Group) error { result := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}).Save(group) @@ -1288,12 +1308,57 @@ func (s *SqlStore) GetRouteByID(ctx context.Context, lockStrength LockingStrengt // GetAccountSetupKeys retrieves setup keys for an account. func (s *SqlStore) GetAccountSetupKeys(ctx context.Context, lockStrength LockingStrength, accountID string) ([]*SetupKey, error) { - return getRecords[*SetupKey](s.db, lockStrength, accountID) + var setupKeys []*SetupKey + result := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}). + Find(&setupKeys, accountIDCondition, accountID) + if err := result.Error; err != nil { + log.WithContext(ctx).Errorf("failed to get setup keys from the store: %s", err) + return nil, status.Errorf(status.Internal, "failed to get setup keys from store") + } + + return setupKeys, nil } // GetSetupKeyByID retrieves a setup key by its ID and account ID. -func (s *SqlStore) GetSetupKeyByID(ctx context.Context, lockStrength LockingStrength, setupKeyID string, accountID string) (*SetupKey, error) { - return getRecordByID[SetupKey](s.db, lockStrength, setupKeyID, accountID) +func (s *SqlStore) GetSetupKeyByID(ctx context.Context, lockStrength LockingStrength, accountID, setupKeyID string) (*SetupKey, error) { + var setupKey *SetupKey + result := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}). + First(&setupKey, accountAndIDQueryCondition, accountID, setupKeyID) + if err := result.Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, status.NewSetupKeyNotFoundError(setupKeyID) + } + log.WithContext(ctx).Errorf("failed to get setup key from the store: %s", err) + return nil, status.Errorf(status.Internal, "failed to get setup key from store") + } + + return setupKey, nil +} + +// SaveSetupKey saves a setup key to the database. +func (s *SqlStore) SaveSetupKey(ctx context.Context, lockStrength LockingStrength, setupKey *SetupKey) error { + result := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}).Save(setupKey) + if result.Error != nil { + log.WithContext(ctx).Errorf("failed to save setup key to store: %s", result.Error) + return status.Errorf(status.Internal, "failed to save setup key to store") + } + + return nil +} + +// DeleteSetupKey deletes a setup key from the database. +func (s *SqlStore) DeleteSetupKey(ctx context.Context, lockStrength LockingStrength, accountID, keyID string) error { + result := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}).Delete(&SetupKey{}, accountAndIDQueryCondition, accountID, keyID) + if result.Error != nil { + log.WithContext(ctx).Errorf("failed to delete setup key from store: %s", result.Error) + return status.Errorf(status.Internal, "failed to delete setup key from store") + } + + if result.RowsAffected == 0 { + return status.NewSetupKeyNotFoundError(keyID) + } + + return nil } // GetAccountNameServerGroups retrieves name server groups for an account. @@ -1306,10 +1371,6 @@ func (s *SqlStore) GetNameServerGroupByID(ctx context.Context, lockStrength Lock return getRecordByID[nbdns.NameServerGroup](s.db, lockStrength, nsGroupID, accountID) } -func (s *SqlStore) DeleteSetupKey(ctx context.Context, accountID, keyID string) error { - return deleteRecordByID[SetupKey](s.db, LockingStrengthUpdate, keyID, accountID) -} - // getRecords retrieves records from the database based on the account ID. func getRecords[T any](db *gorm.DB, lockStrength LockingStrength, accountID string) ([]T, error) { var record []T @@ -1342,21 +1403,3 @@ func getRecordByID[T any](db *gorm.DB, lockStrength LockingStrength, recordID, a } return &record, nil } - -// deleteRecordByID deletes a record by its ID and account ID from the database. -func deleteRecordByID[T any](db *gorm.DB, lockStrength LockingStrength, recordID, accountID string) error { - var record T - result := db.Clauses(clause.Locking{Strength: string(lockStrength)}).Delete(record, accountAndIDQueryCondition, accountID, recordID) - if err := result.Error; err != nil { - parts := strings.Split(fmt.Sprintf("%T", record), ".") - recordType := parts[len(parts)-1] - - return status.Errorf(status.Internal, "failed to delete %s from store: %v", recordType, err) - } - - if result.RowsAffected == 0 { - return status.Errorf(status.NotFound, "record not found") - } - - return nil -} diff --git a/management/server/sql_store_test.go b/management/server/sql_store_test.go index b371e2313..3f3b2a453 100644 --- a/management/server/sql_store_test.go +++ b/management/server/sql_store_test.go @@ -1274,7 +1274,7 @@ func Test_DeleteSetupKeySuccessfully(t *testing.T) { accountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b" setupKeyID := "A2C8E62B-38F5-4553-B31E-DD66C696CEBB" - err = store.DeleteSetupKey(context.Background(), accountID, setupKeyID) + err = store.DeleteSetupKey(context.Background(), LockingStrengthUpdate, accountID, setupKeyID) require.NoError(t, err) _, err = store.GetSetupKeyByID(context.Background(), LockingStrengthShare, setupKeyID, accountID) @@ -1290,6 +1290,6 @@ func Test_DeleteSetupKeyFailsForNonExistingKey(t *testing.T) { accountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b" nonExistingKeyID := "non-existing-key-id" - err = store.DeleteSetupKey(context.Background(), accountID, nonExistingKeyID) + err = store.DeleteSetupKey(context.Background(), LockingStrengthUpdate, accountID, nonExistingKeyID) require.Error(t, err) } diff --git a/management/server/status/error.go b/management/server/status/error.go index a145edf80..f1f3f16e6 100644 --- a/management/server/status/error.go +++ b/management/server/status/error.go @@ -103,19 +103,29 @@ func NewPeerLoginExpiredError() error { } // NewSetupKeyNotFoundError creates a new Error with NotFound type for a missing setup key -func NewSetupKeyNotFoundError(err error) error { - return Errorf(NotFound, "setup key not found: %s", err) +func NewSetupKeyNotFoundError(setupKeyID string) error { + return Errorf(NotFound, "setup key: %s not found", setupKeyID) } func NewGetAccountFromStoreError(err error) error { return Errorf(Internal, "issue getting account from store: %s", err) } +// NewUserNotPartOfAccountError creates a new Error with PermissionDenied type for a user not being part of an account +func NewUserNotPartOfAccountError() error { + return Errorf(PermissionDenied, "user is not part of this account") +} + // NewGetUserFromStoreError creates a new Error with Internal type for an issue getting user from store func NewGetUserFromStoreError() error { return Errorf(Internal, "issue getting user from store") } +// NewAdminPermissionError creates a new Error with PermissionDenied type for actions requiring admin role. +func NewAdminPermissionError() error { + return Errorf(PermissionDenied, "admin role required to perform this action") +} + // NewStoreContextCanceledError creates a new Error with Internal type for a canceled store context func NewStoreContextCanceledError(duration time.Duration) error { return Errorf(Internal, "store access: context canceled after %v", duration) @@ -126,7 +136,7 @@ func NewInvalidKeyIDError() error { return Errorf(InvalidArgument, "invalid key ID") } -// NewUnauthorizedToViewSetupKeysError creates a new Error with Unauthorized type for an issue getting a setup key -func NewUnauthorizedToViewSetupKeysError() error { - return Errorf(Unauthorized, "only users with admin power can view setup keys") +// NewGetAccountError creates a new Error with Internal type for an issue getting account +func NewGetAccountError(err error) error { + return Errorf(Internal, "error getting account: %s", err) } diff --git a/management/server/store.go b/management/server/store.go index 7c288d19f..4ee76e797 100644 --- a/management/server/store.go +++ b/management/server/store.go @@ -70,9 +70,10 @@ type Store interface { DeleteHashedPAT2TokenIDIndex(hashedToken string) error DeleteTokenID2UserIDIndex(tokenID string) error - GetAccountGroups(ctx context.Context, accountID string) ([]*nbgroup.Group, error) + GetAccountGroups(ctx context.Context, lockStrength LockingStrength, accountID string) ([]*nbgroup.Group, error) GetGroupByID(ctx context.Context, lockStrength LockingStrength, groupID, accountID string) (*nbgroup.Group, error) GetGroupByName(ctx context.Context, lockStrength LockingStrength, groupName, accountID string) (*nbgroup.Group, error) + GetGroupsByIDs(ctx context.Context, lockStrength LockingStrength, accountID string, groupIDs []string) (map[string]*nbgroup.Group, error) SaveGroups(ctx context.Context, lockStrength LockingStrength, groups []*nbgroup.Group) error SaveGroup(ctx context.Context, lockStrength LockingStrength, group *nbgroup.Group) error @@ -96,7 +97,9 @@ type Store interface { GetSetupKeyBySecret(ctx context.Context, lockStrength LockingStrength, key string) (*SetupKey, error) IncrementSetupKeyUsage(ctx context.Context, setupKeyID string) error GetAccountSetupKeys(ctx context.Context, lockStrength LockingStrength, accountID string) ([]*SetupKey, error) - GetSetupKeyByID(ctx context.Context, lockStrength LockingStrength, setupKeyID string, accountID string) (*SetupKey, error) + GetSetupKeyByID(ctx context.Context, lockStrength LockingStrength, accountID, setupKeyID string) (*SetupKey, error) + SaveSetupKey(ctx context.Context, lockStrength LockingStrength, setupKey *SetupKey) error + DeleteSetupKey(ctx context.Context, lockStrength LockingStrength, accountID, keyID string) error GetAccountRoutes(ctx context.Context, lockStrength LockingStrength, accountID string) ([]*route.Route, error) GetRouteByID(ctx context.Context, lockStrength LockingStrength, routeID string, accountID string) (*route.Route, error) @@ -124,7 +127,6 @@ type Store interface { // This is also a method of metrics.DataSource interface. GetStoreEngine() StoreEngine ExecuteInTransaction(ctx context.Context, f func(store Store) error) error - DeleteSetupKey(ctx context.Context, accountID, keyID string) error } type StoreEngine string diff --git a/management/server/updatechannel.go b/management/server/updatechannel.go index 59b6fd094..d338b84b1 100644 --- a/management/server/updatechannel.go +++ b/management/server/updatechannel.go @@ -96,9 +96,12 @@ func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) { if channel, ok := p.peerChannels[peerID]; ok { delete(p.peerChannels, peerID) close(channel) + + log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID) + return } - log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID) + log.WithContext(ctx).Debugf("closing updates channel: peer %s has no channel", peerID) } // CloseChannels closes updates channel for each given peer diff --git a/management/server/user.go b/management/server/user.go index 9fdd3a6ee..5e0d9d034 100644 --- a/management/server/user.go +++ b/management/server/user.go @@ -9,14 +9,16 @@ import ( "time" "github.com/google/uuid" + log "github.com/sirupsen/logrus" + "github.com/netbirdio/netbird/management/server/activity" + nbContext "github.com/netbirdio/netbird/management/server/context" nbgroup "github.com/netbirdio/netbird/management/server/group" "github.com/netbirdio/netbird/management/server/idp" "github.com/netbirdio/netbird/management/server/integration_reference" "github.com/netbirdio/netbird/management/server/jwtclaims" nbpeer "github.com/netbirdio/netbird/management/server/peer" "github.com/netbirdio/netbird/management/server/status" - log "github.com/sirupsen/logrus" ) const ( @@ -103,6 +105,11 @@ func (u *User) IsAdminOrServiceUser() bool { return u.HasAdminPower() || u.IsServiceUser } +// IsRegularUser checks if the user is a regular user. +func (u *User) IsRegularUser() bool { + return !u.HasAdminPower() && !u.IsServiceUser +} + // ToUserInfo converts a User object to a UserInfo object. func (u *User) ToUserInfo(userData *idp.UserData, settings *Settings) (*UserInfo, error) { autoGroups := u.AutoGroups @@ -1100,6 +1107,9 @@ func (am *DefaultAccountManager) GetUsersFromAccount(ctx context.Context, accoun func (am *DefaultAccountManager) expireAndUpdatePeers(ctx context.Context, account *Account, peers []*nbpeer.Peer) error { var peerIDs []string for _, peer := range peers { + // nolint:staticcheck + ctx = context.WithValue(ctx, nbContext.PeerIDKey, peer.Key) + if peer.Status.LoginExpired { continue } @@ -1107,8 +1117,11 @@ func (am *DefaultAccountManager) expireAndUpdatePeers(ctx context.Context, accou peer.MarkLoginExpired(true) account.UpdatePeer(peer) if err := am.Store.SavePeerStatus(account.Id, peer.ID, *peer.Status); err != nil { - return err + return fmt.Errorf("failed saving peer status for peer %s: %s", peer.ID, err) } + + log.WithContext(ctx).Tracef("mark peer %s login expired", peer.ID) + am.StoreEvent( ctx, peer.UserID, peer.ID, account.Id, diff --git a/relay/client/client.go b/relay/client/client.go index a82a75453..154c1787f 100644 --- a/relay/client/client.go +++ b/relay/client/client.go @@ -3,7 +3,6 @@ package client import ( "context" "fmt" - "io" "net" "sync" "time" @@ -449,11 +448,11 @@ func (c *Client) writeTo(connReference *Conn, id string, dstID []byte, payload [ conn, ok := c.conns[id] c.mu.Unlock() if !ok { - return 0, io.EOF + return 0, net.ErrClosed } if conn.conn != connReference { - return 0, io.EOF + return 0, net.ErrClosed } // todo: use buffer pool instead of create new transport msg. @@ -508,7 +507,7 @@ func (c *Client) closeConn(connReference *Conn, id string) error { container, ok := c.conns[id] if !ok { - return fmt.Errorf("connection already closed") + return net.ErrClosed } if container.conn != connReference { diff --git a/relay/client/conn.go b/relay/client/conn.go index b4ff903e8..fe1b6fb52 100644 --- a/relay/client/conn.go +++ b/relay/client/conn.go @@ -1,7 +1,6 @@ package client import ( - "io" "net" "time" ) @@ -40,7 +39,7 @@ func (c *Conn) Write(p []byte) (n int, err error) { func (c *Conn) Read(b []byte) (n int, err error) { msg, ok := <-c.messageChan if !ok { - return 0, io.EOF + return 0, net.ErrClosed } n = copy(b, msg.Payload) diff --git a/relay/server/listener/ws/conn.go b/relay/server/listener/ws/conn.go index c248963b9..12e721fdb 100644 --- a/relay/server/listener/ws/conn.go +++ b/relay/server/listener/ws/conn.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io" "net" "sync" "time" @@ -100,7 +99,7 @@ func (c *Conn) isClosed() bool { func (c *Conn) ioErrHandling(err error) error { if c.isClosed() { - return io.EOF + return net.ErrClosed } var wErr *websocket.CloseError @@ -108,7 +107,7 @@ func (c *Conn) ioErrHandling(err error) error { return err } if wErr.Code == websocket.StatusNormalClosure { - return io.EOF + return net.ErrClosed } return err } diff --git a/relay/server/peer.go b/relay/server/peer.go index a9c542f84..c909c35d5 100644 --- a/relay/server/peer.go +++ b/relay/server/peer.go @@ -2,7 +2,7 @@ package server import ( "context" - "io" + "errors" "net" "sync" "time" @@ -57,7 +57,7 @@ func (p *Peer) Work() { for { n, err := p.conn.Read(buf) if err != nil { - if err != io.EOF { + if !errors.Is(err, net.ErrClosed) { p.log.Errorf("failed to read message: %s", err) } return