[client,management] Feature/client service expose (#5411)

CLI: new expose command to publish a local port with flags for PIN, password, user groups, custom domain, name prefix and protocol (HTTP default).
Management/API: create/renew/stop expose sessions (streamed status), automatic naming/domain, TTL renewals, background expiration, new management RPCs and client methods.
UI/API: account settings now include peer_expose_enabled and peer_expose_groups; new activity codes for peer expose events.
This commit is contained in:
Maycon Santos
2026-02-24 10:02:16 +01:00
committed by GitHub
parent 37f025c966
commit 63c83aa8d2
44 changed files with 3867 additions and 422 deletions

View File

@@ -0,0 +1,301 @@
package grpc
import (
"context"
"regexp"
"sync"
"time"
pb "github.com/golang/protobuf/proto" // nolint
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/netbirdio/netbird/encryption"
"github.com/netbirdio/netbird/management/internals/modules/reverseproxy"
nbContext "github.com/netbirdio/netbird/management/server/context"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/shared/management/proto"
internalStatus "github.com/netbirdio/netbird/shared/management/status"
)
var pinRegexp = regexp.MustCompile(`^\d{6}$`)
const (
exposeTTL = 90 * time.Second
exposeReapInterval = 30 * time.Second
maxExposesPerPeer = 10
)
type activeExpose struct {
mu sync.Mutex
serviceID string
domain string
accountID string
peerID string
lastRenewed time.Time
}
func exposeKey(peerID, domain string) string {
return peerID + ":" + domain
}
// CreateExpose handles a peer request to create a new expose service.
func (s *Server) CreateExpose(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
exposeReq := &proto.ExposeServiceRequest{}
peerKey, err := s.parseRequest(ctx, req, exposeReq)
if err != nil {
return nil, err
}
accountID, peer, err := s.authenticateExposePeer(ctx, peerKey)
if err != nil {
return nil, err
}
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
if exposeReq.Protocol != proto.ExposeProtocol_EXPOSE_HTTP && exposeReq.Protocol != proto.ExposeProtocol_EXPOSE_HTTPS {
return nil, status.Errorf(codes.InvalidArgument, "only HTTP or HTTPS protocol are supported")
}
if exposeReq.Pin != "" && !pinRegexp.MatchString(exposeReq.Pin) {
return nil, status.Errorf(codes.InvalidArgument, "invalid pin: must be exactly 6 digits")
}
for _, g := range exposeReq.UserGroups {
if g == "" {
return nil, status.Errorf(codes.InvalidArgument, "user group name cannot be empty")
}
}
reverseProxyMgr := s.getReverseProxyManager()
if reverseProxyMgr == nil {
return nil, status.Errorf(codes.Internal, "reverse proxy manager not available")
}
if err := reverseProxyMgr.ValidateExposePermission(ctx, accountID, peer.ID); err != nil {
log.WithContext(ctx).Debugf("expose permission denied for peer %s: %v", peer.ID, err)
return nil, status.Errorf(codes.PermissionDenied, "permission denied")
}
serviceName, err := reverseproxy.GenerateExposeName(exposeReq.NamePrefix)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "generate service name: %v", err)
}
service := reverseproxy.FromExposeRequest(exposeReq, accountID, peer.ID, serviceName)
// Serialize the count check to prevent concurrent CreateExpose calls from
// exceeding maxExposesPerPeer. The lock is held only for the check; the
// actual service creation happens outside the lock.
s.exposeCreateMu.Lock()
if s.countPeerExposes(peer.ID) >= maxExposesPerPeer {
s.exposeCreateMu.Unlock()
return nil, status.Errorf(codes.ResourceExhausted, "peer has reached the maximum number of active expose sessions (%d)", maxExposesPerPeer)
}
s.exposeCreateMu.Unlock()
created, err := reverseProxyMgr.CreateServiceFromPeer(ctx, accountID, peer.ID, service)
if err != nil {
log.WithContext(ctx).Errorf("failed to create service from peer: %v", err)
return nil, status.Errorf(codes.Internal, "create service: %v", err)
}
key := exposeKey(peer.ID, created.Domain)
if _, loaded := s.activeExposes.LoadOrStore(key, &activeExpose{
serviceID: created.ID,
domain: created.Domain,
accountID: accountID,
peerID: peer.ID,
lastRenewed: time.Now(),
}); loaded {
s.deleteExposeService(ctx, accountID, peer.ID, created)
return nil, status.Errorf(codes.AlreadyExists, "peer already has an active expose session for this domain")
}
resp := &proto.ExposeServiceResponse{
ServiceName: created.Name,
ServiceUrl: "https://" + created.Domain,
Domain: created.Domain,
}
return s.encryptResponse(peerKey, resp)
}
// RenewExpose extends the TTL of an active expose session.
func (s *Server) RenewExpose(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
renewReq := &proto.RenewExposeRequest{}
peerKey, err := s.parseRequest(ctx, req, renewReq)
if err != nil {
return nil, err
}
_, peer, err := s.authenticateExposePeer(ctx, peerKey)
if err != nil {
return nil, err
}
key := exposeKey(peer.ID, renewReq.Domain)
val, ok := s.activeExposes.Load(key)
if !ok {
return nil, status.Errorf(codes.NotFound, "no active expose session for domain %s", renewReq.Domain)
}
expose := val.(*activeExpose)
expose.mu.Lock()
expose.lastRenewed = time.Now()
expose.mu.Unlock()
return s.encryptResponse(peerKey, &proto.RenewExposeResponse{})
}
// StopExpose terminates an active expose session.
func (s *Server) StopExpose(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
stopReq := &proto.StopExposeRequest{}
peerKey, err := s.parseRequest(ctx, req, stopReq)
if err != nil {
return nil, err
}
_, peer, err := s.authenticateExposePeer(ctx, peerKey)
if err != nil {
return nil, err
}
key := exposeKey(peer.ID, stopReq.Domain)
val, ok := s.activeExposes.LoadAndDelete(key)
if !ok {
return nil, status.Errorf(codes.NotFound, "no active expose session for domain %s", stopReq.Domain)
}
expose := val.(*activeExpose)
s.cleanupExpose(expose, false)
return s.encryptResponse(peerKey, &proto.StopExposeResponse{})
}
// StartExposeReaper starts a background goroutine that reaps expired expose sessions.
func (s *Server) StartExposeReaper(ctx context.Context) {
go func() {
ticker := time.NewTicker(exposeReapInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.reapExpiredExposes()
}
}
}()
}
func (s *Server) reapExpiredExposes() {
s.activeExposes.Range(func(key, val any) bool {
expose := val.(*activeExpose)
expose.mu.Lock()
expired := time.Since(expose.lastRenewed) > exposeTTL
expose.mu.Unlock()
if expired {
if _, deleted := s.activeExposes.LoadAndDelete(key); deleted {
log.Infof("reaping expired expose session for peer %s, domain %s", expose.peerID, expose.domain)
s.cleanupExpose(expose, true)
}
}
return true
})
}
func (s *Server) encryptResponse(peerKey wgtypes.Key, msg pb.Message) (*proto.EncryptedMessage, error) {
wgKey, err := s.secretsManager.GetWGKey()
if err != nil {
return nil, status.Errorf(codes.Internal, "internal error")
}
encryptedResp, err := encryption.EncryptMessage(peerKey, wgKey, msg)
if err != nil {
return nil, status.Errorf(codes.Internal, "encrypt response")
}
return &proto.EncryptedMessage{
WgPubKey: wgKey.PublicKey().String(),
Body: encryptedResp,
}, nil
}
func (s *Server) authenticateExposePeer(ctx context.Context, peerKey wgtypes.Key) (string, *nbpeer.Peer, error) {
accountID, err := s.accountManager.GetAccountIDForPeerKey(ctx, peerKey.String())
if err != nil {
if errStatus, ok := internalStatus.FromError(err); ok && errStatus.Type() == internalStatus.NotFound {
return "", nil, status.Errorf(codes.PermissionDenied, "peer is not registered")
}
return "", nil, status.Errorf(codes.Internal, "lookup account for peer")
}
peer, err := s.accountManager.GetStore().GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerKey.String())
if err != nil {
return "", nil, status.Errorf(codes.PermissionDenied, "peer is not registered")
}
return accountID, peer, nil
}
func (s *Server) deleteExposeService(ctx context.Context, accountID, peerID string, service *reverseproxy.Service) {
reverseProxyMgr := s.getReverseProxyManager()
if reverseProxyMgr == nil {
return
}
if err := reverseProxyMgr.DeleteServiceFromPeer(ctx, accountID, peerID, service.ID); err != nil {
log.WithContext(ctx).Debugf("failed to delete expose service %s: %v", service.ID, err)
}
}
func (s *Server) cleanupExpose(expose *activeExpose, expired bool) {
bgCtx := context.Background()
reverseProxyMgr := s.getReverseProxyManager()
if reverseProxyMgr == nil {
log.Errorf("cannot cleanup exposed service %s: reverse proxy manager not available", expose.serviceID)
return
}
var err error
if expired {
err = reverseProxyMgr.ExpireServiceFromPeer(bgCtx, expose.accountID, expose.peerID, expose.serviceID)
} else {
err = reverseProxyMgr.DeleteServiceFromPeer(bgCtx, expose.accountID, expose.peerID, expose.serviceID)
}
if err != nil {
log.Errorf("failed to delete peer-exposed service %s: %v", expose.serviceID, err)
}
}
func (s *Server) countPeerExposes(peerID string) int {
count := 0
s.activeExposes.Range(func(_, val any) bool {
if expose := val.(*activeExpose); expose.peerID == peerID {
count++
}
return true
})
return count
}
func (s *Server) getReverseProxyManager() reverseproxy.Manager {
s.reverseProxyMu.RLock()
defer s.reverseProxyMu.RUnlock()
return s.reverseProxyManager
}
// SetReverseProxyManager sets the reverse proxy manager on the server.
func (s *Server) SetReverseProxyManager(mgr reverseproxy.Manager) {
s.reverseProxyMu.Lock()
defer s.reverseProxyMu.Unlock()
s.reverseProxyManager = mgr
}

View File

@@ -0,0 +1,242 @@
package grpc
import (
"sync"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/netbirdio/netbird/management/internals/modules/reverseproxy"
)
func TestPinValidation(t *testing.T) {
tests := []struct {
pin string
valid bool
}{
{"123456", true},
{"000000", true},
{"12345", false},
{"1234567", false},
{"abcdef", false},
{"12345a", false},
{"", false},
{"12 345", false},
}
for _, tt := range tests {
assert.Equal(t, tt.valid, pinRegexp.MatchString(tt.pin), "pin %q", tt.pin)
}
}
func TestExposeKey(t *testing.T) {
assert.Equal(t, "peer1:example.com", exposeKey("peer1", "example.com"))
assert.Equal(t, "peer2:other.com", exposeKey("peer2", "other.com"))
assert.NotEqual(t, exposeKey("peer1", "a.com"), exposeKey("peer1", "b.com"))
}
func TestCountPeerExposes(t *testing.T) {
s := &Server{}
// No exposes
assert.Equal(t, 0, s.countPeerExposes("peer1"))
// Add some exposes for different peers
s.activeExposes.Store("peer1:a.com", &activeExpose{peerID: "peer1"})
s.activeExposes.Store("peer1:b.com", &activeExpose{peerID: "peer1"})
s.activeExposes.Store("peer2:a.com", &activeExpose{peerID: "peer2"})
assert.Equal(t, 2, s.countPeerExposes("peer1"), "peer1 should have 2 exposes")
assert.Equal(t, 1, s.countPeerExposes("peer2"), "peer2 should have 1 expose")
assert.Equal(t, 0, s.countPeerExposes("peer3"), "peer3 should have 0 exposes")
}
func TestReapExpiredExposes(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockMgr := reverseproxy.NewMockManager(ctrl)
s := &Server{}
s.SetReverseProxyManager(mockMgr)
now := time.Now()
// Add an expired expose and a still-active one
s.activeExposes.Store("peer1:expired.com", &activeExpose{
serviceID: "svc-expired",
domain: "expired.com",
accountID: "acct1",
peerID: "peer1",
lastRenewed: now.Add(-2 * exposeTTL),
})
s.activeExposes.Store("peer1:active.com", &activeExpose{
serviceID: "svc-active",
domain: "active.com",
accountID: "acct1",
peerID: "peer1",
lastRenewed: now,
})
// Expect ExpireServiceFromPeer called only for the expired one
mockMgr.EXPECT().
ExpireServiceFromPeer(gomock.Any(), "acct1", "peer1", "svc-expired").
Return(nil)
s.reapExpiredExposes()
// Verify expired one is removed
_, exists := s.activeExposes.Load("peer1:expired.com")
assert.False(t, exists, "expired expose should be removed")
// Verify active one remains
_, exists = s.activeExposes.Load("peer1:active.com")
assert.True(t, exists, "active expose should remain")
}
func TestCleanupExpose_Delete(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockMgr := reverseproxy.NewMockManager(ctrl)
s := &Server{}
s.SetReverseProxyManager(mockMgr)
mockMgr.EXPECT().
DeleteServiceFromPeer(gomock.Any(), "acct1", "peer1", "svc1").
Return(nil)
s.cleanupExpose(&activeExpose{
serviceID: "svc1",
accountID: "acct1",
peerID: "peer1",
}, false)
}
func TestCleanupExpose_Expire(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockMgr := reverseproxy.NewMockManager(ctrl)
s := &Server{}
s.SetReverseProxyManager(mockMgr)
mockMgr.EXPECT().
ExpireServiceFromPeer(gomock.Any(), "acct1", "peer1", "svc1").
Return(nil)
s.cleanupExpose(&activeExpose{
serviceID: "svc1",
accountID: "acct1",
peerID: "peer1",
}, true)
}
func TestCleanupExpose_NilManager(t *testing.T) {
s := &Server{}
// Should not panic when reverse proxy manager is nil
s.cleanupExpose(&activeExpose{
serviceID: "svc1",
accountID: "acct1",
peerID: "peer1",
}, false)
}
func TestSetReverseProxyManager(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
s := &Server{}
// Initially nil
assert.Nil(t, s.getReverseProxyManager())
mockMgr := reverseproxy.NewMockManager(ctrl)
s.SetReverseProxyManager(mockMgr)
assert.NotNil(t, s.getReverseProxyManager())
// Can set to nil
s.SetReverseProxyManager(nil)
assert.Nil(t, s.getReverseProxyManager())
}
func TestReapExpiredExposes_ConcurrentSafety(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockMgr := reverseproxy.NewMockManager(ctrl)
mockMgr.EXPECT().
ExpireServiceFromPeer(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).
AnyTimes()
s := &Server{}
s.SetReverseProxyManager(mockMgr)
// Pre-populate with expired sessions
for i := range 20 {
peerID := "peer1"
domain := "domain-" + string(rune('a'+i))
s.activeExposes.Store(exposeKey(peerID, domain), &activeExpose{
serviceID: "svc-" + domain,
domain: domain,
accountID: "acct1",
peerID: peerID,
lastRenewed: time.Now().Add(-2 * exposeTTL),
})
}
// Run reaper concurrently with count
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
s.reapExpiredExposes()
}()
go func() {
defer wg.Done()
s.countPeerExposes("peer1")
}()
wg.Wait()
assert.Equal(t, 0, s.countPeerExposes("peer1"), "all expired exposes should be reaped")
}
func TestActiveExposeMutexProtectsLastRenewed(t *testing.T) {
expose := &activeExpose{
lastRenewed: time.Now().Add(-1 * time.Hour),
}
// Simulate concurrent renew and read
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for range 100 {
expose.mu.Lock()
expose.lastRenewed = time.Now()
expose.mu.Unlock()
}
}()
go func() {
defer wg.Done()
for range 100 {
expose.mu.Lock()
_ = time.Since(expose.lastRenewed)
expose.mu.Unlock()
}
}()
wg.Wait()
expose.mu.Lock()
require.False(t, expose.lastRenewed.IsZero(), "lastRenewed should not be zero after concurrent access")
expose.mu.Unlock()
}

View File

@@ -76,6 +76,22 @@ func (m *mockReverseProxyManager) GetServiceIDByTargetID(_ context.Context, _, _
return "", nil
}
func (m *mockReverseProxyManager) ValidateExposePermission(_ context.Context, _, _ string) error {
return nil
}
func (m *mockReverseProxyManager) CreateServiceFromPeer(_ context.Context, _, _ string, _ *reverseproxy.Service) (*reverseproxy.Service, error) {
return &reverseproxy.Service{}, nil
}
func (m *mockReverseProxyManager) DeleteServiceFromPeer(_ context.Context, _, _, _ string) error {
return nil
}
func (m *mockReverseProxyManager) ExpireServiceFromPeer(_ context.Context, _, _, _ string) error {
return nil
}
type mockUsersManager struct {
users map[string]*types.User
err error

View File

@@ -26,6 +26,7 @@ import (
"github.com/netbirdio/netbird/shared/management/client/common"
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
"github.com/netbirdio/netbird/management/internals/modules/reverseproxy"
nbconfig "github.com/netbirdio/netbird/management/internals/server/config"
"github.com/netbirdio/netbird/management/server/idp"
"github.com/netbirdio/netbird/management/server/job"
@@ -80,6 +81,11 @@ type Server struct {
syncSem atomic.Int32
syncLimEnabled bool
syncLim int32
activeExposes sync.Map
exposeCreateMu sync.Mutex
reverseProxyManager reverseproxy.Manager
reverseProxyMu sync.RWMutex
}
// NewServer creates a new Management server

View File

@@ -295,6 +295,22 @@ func (m *testValidateSessionProxyManager) GetServiceIDByTargetID(_ context.Conte
return "", nil
}
func (m *testValidateSessionProxyManager) ValidateExposePermission(_ context.Context, _, _ string) error {
return nil
}
func (m *testValidateSessionProxyManager) CreateServiceFromPeer(_ context.Context, _, _ string, _ *reverseproxy.Service) (*reverseproxy.Service, error) {
return nil, nil
}
func (m *testValidateSessionProxyManager) DeleteServiceFromPeer(_ context.Context, _, _, _ string) error {
return nil
}
func (m *testValidateSessionProxyManager) ExpireServiceFromPeer(_ context.Context, _, _, _ string) error {
return nil
}
type testValidateSessionUsersManager struct {
store store.Store
}