mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-22 02:06:39 +00:00
[proxy] feature: bring your own proxy
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -51,6 +52,11 @@ type ClusterInfo struct {
|
||||
ConnectedProxies int
|
||||
}
|
||||
|
||||
// ProxyTokenChecker checks whether a proxy access token is still valid.
|
||||
type ProxyTokenChecker interface {
|
||||
IsProxyAccessTokenValid(ctx context.Context, tokenID string) (bool, error)
|
||||
}
|
||||
|
||||
// ProxyServiceServer implements the ProxyService gRPC server
|
||||
type ProxyServiceServer struct {
|
||||
proto.UnimplementedProxyServiceServer
|
||||
@@ -79,6 +85,9 @@ type ProxyServiceServer struct {
|
||||
// Store for one-time authentication tokens
|
||||
tokenStore *OneTimeTokenStore
|
||||
|
||||
// Checker for proxy access token validity
|
||||
tokenChecker ProxyTokenChecker
|
||||
|
||||
// OIDC configuration for proxy authentication
|
||||
oidcConfig ProxyOIDCConfig
|
||||
|
||||
@@ -90,16 +99,29 @@ const pkceVerifierTTL = 10 * time.Minute
|
||||
|
||||
// proxyConnection represents a connected proxy
|
||||
type proxyConnection struct {
|
||||
proxyID string
|
||||
address string
|
||||
stream proto.ProxyService_GetMappingUpdateServer
|
||||
sendChan chan *proto.GetMappingUpdateResponse
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
proxyID string
|
||||
address string
|
||||
accountID *string
|
||||
tokenID string
|
||||
stream proto.ProxyService_GetMappingUpdateServer
|
||||
sendChan chan *proto.GetMappingUpdateResponse
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func enforceAccountScope(ctx context.Context, requestAccountID string) error {
|
||||
token := GetProxyTokenFromContext(ctx)
|
||||
if token == nil || token.AccountID == nil {
|
||||
return nil
|
||||
}
|
||||
if requestAccountID == "" || *token.AccountID != requestAccountID {
|
||||
return status.Errorf(codes.PermissionDenied, "account-scoped token cannot access account %s", requestAccountID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewProxyServiceServer creates a new proxy service server.
|
||||
func NewProxyServiceServer(accessLogMgr accesslogs.Manager, tokenStore *OneTimeTokenStore, pkceStore *PKCEVerifierStore, oidcConfig ProxyOIDCConfig, peersManager peers.Manager, usersManager users.Manager, proxyMgr proxy.Manager) *ProxyServiceServer {
|
||||
func NewProxyServiceServer(accessLogMgr accesslogs.Manager, tokenStore *OneTimeTokenStore, pkceStore *PKCEVerifierStore, oidcConfig ProxyOIDCConfig, peersManager peers.Manager, usersManager users.Manager, proxyMgr proxy.Manager, tokenChecker ProxyTokenChecker) *ProxyServiceServer {
|
||||
ctx := context.Background()
|
||||
s := &ProxyServiceServer{
|
||||
accessLogManager: accessLogMgr,
|
||||
@@ -109,6 +131,7 @@ func NewProxyServiceServer(accessLogMgr accesslogs.Manager, tokenStore *OneTimeT
|
||||
peersManager: peersManager,
|
||||
usersManager: usersManager,
|
||||
proxyManager: proxyMgr,
|
||||
tokenChecker: tokenChecker,
|
||||
}
|
||||
go s.cleanupStaleProxies(ctx)
|
||||
return s
|
||||
@@ -155,14 +178,57 @@ func (s *ProxyServiceServer) GetMappingUpdate(req *proto.GetMappingUpdateRequest
|
||||
return status.Errorf(codes.InvalidArgument, "proxy address is invalid")
|
||||
}
|
||||
|
||||
var accountID *string
|
||||
token := GetProxyTokenFromContext(ctx)
|
||||
if token != nil && token.AccountID != nil {
|
||||
accountID = token.AccountID
|
||||
|
||||
existingProxy, _ := s.proxyManager.GetAccountProxy(ctx, *accountID)
|
||||
if existingProxy != nil && existingProxy.ID != proxyID {
|
||||
if existingProxy.Status == proxy.StatusConnected {
|
||||
return status.Errorf(codes.ResourceExhausted, "limit of 1 self-hosted proxy per account")
|
||||
}
|
||||
if err := s.proxyManager.DeleteProxy(ctx, existingProxy.ID); err != nil {
|
||||
log.WithContext(ctx).Warnf("failed to cleanup disconnected proxy %s: %v", existingProxy.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
available, err := s.proxyManager.IsClusterAddressAvailable(ctx, proxyAddress, *accountID)
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "check cluster address: %v", err)
|
||||
}
|
||||
if !available {
|
||||
return status.Errorf(codes.AlreadyExists, "cluster address %s is already in use", proxyAddress)
|
||||
}
|
||||
}
|
||||
|
||||
var tokenID string
|
||||
if token != nil {
|
||||
tokenID = token.ID
|
||||
}
|
||||
|
||||
connCtx, cancel := context.WithCancel(ctx)
|
||||
conn := &proxyConnection{
|
||||
proxyID: proxyID,
|
||||
address: proxyAddress,
|
||||
stream: stream,
|
||||
sendChan: make(chan *proto.GetMappingUpdateResponse, 100),
|
||||
ctx: connCtx,
|
||||
cancel: cancel,
|
||||
proxyID: proxyID,
|
||||
address: proxyAddress,
|
||||
accountID: accountID,
|
||||
tokenID: tokenID,
|
||||
stream: stream,
|
||||
sendChan: make(chan *proto.GetMappingUpdateResponse, 100),
|
||||
ctx: connCtx,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
// Register proxy in database
|
||||
if err := s.proxyManager.Connect(ctx, proxyID, proxyAddress, peerInfo, accountID); err != nil {
|
||||
if accountID != nil {
|
||||
cancel()
|
||||
if strings.Contains(err.Error(), "UNIQUE constraint") || strings.Contains(err.Error(), "duplicate key") || strings.Contains(err.Error(), "idx_proxy_account_id_unique") {
|
||||
return status.Errorf(codes.ResourceExhausted, "limit of 1 self-hosted proxy per account")
|
||||
}
|
||||
return status.Errorf(codes.Internal, "failed to register BYOD proxy: %v", err)
|
||||
}
|
||||
log.WithContext(ctx).Warnf("Failed to register proxy %s in database: %v", proxyID, err)
|
||||
}
|
||||
|
||||
s.connectedProxies.Store(proxyID, conn)
|
||||
@@ -170,15 +236,11 @@ func (s *ProxyServiceServer) GetMappingUpdate(req *proto.GetMappingUpdateRequest
|
||||
log.WithContext(ctx).Warnf("Failed to register proxy %s in cluster: %v", proxyID, err)
|
||||
}
|
||||
|
||||
// Register proxy in database
|
||||
if err := s.proxyManager.Connect(ctx, proxyID, proxyAddress, peerInfo); err != nil {
|
||||
log.WithContext(ctx).Warnf("Failed to register proxy %s in database: %v", proxyID, err)
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"proxy_id": proxyID,
|
||||
"address": proxyAddress,
|
||||
"cluster_addr": proxyAddress,
|
||||
"account_id": accountID,
|
||||
"total_proxies": len(s.GetConnectedProxies()),
|
||||
}).Info("Proxy registered in cluster")
|
||||
defer func() {
|
||||
@@ -203,7 +265,7 @@ func (s *ProxyServiceServer) GetMappingUpdate(req *proto.GetMappingUpdateRequest
|
||||
go s.sender(conn, errChan)
|
||||
|
||||
// Start heartbeat goroutine
|
||||
go s.heartbeat(connCtx, proxyID)
|
||||
go s.heartbeat(connCtx, conn)
|
||||
|
||||
select {
|
||||
case err := <-errChan:
|
||||
@@ -213,16 +275,30 @@ func (s *ProxyServiceServer) GetMappingUpdate(req *proto.GetMappingUpdateRequest
|
||||
}
|
||||
}
|
||||
|
||||
// heartbeat updates the proxy's last_seen timestamp every minute
|
||||
func (s *ProxyServiceServer) heartbeat(ctx context.Context, proxyID string) {
|
||||
// heartbeat updates the proxy's last_seen timestamp every minute and
|
||||
// validates that the proxy's access token is still valid.
|
||||
func (s *ProxyServiceServer) heartbeat(ctx context.Context, conn *proxyConnection) {
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if err := s.proxyManager.Heartbeat(ctx, proxyID); err != nil {
|
||||
log.WithContext(ctx).Debugf("Failed to update proxy %s heartbeat: %v", proxyID, err)
|
||||
if err := s.proxyManager.Heartbeat(ctx, conn.proxyID); err != nil {
|
||||
log.WithContext(ctx).Debugf("Failed to update proxy %s heartbeat: %v", conn.proxyID, err)
|
||||
}
|
||||
|
||||
if conn.tokenID != "" && s.tokenChecker != nil {
|
||||
valid, err := s.tokenChecker.IsProxyAccessTokenValid(ctx, conn.tokenID)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed to check token validity for proxy %s: %v", conn.proxyID, err)
|
||||
continue
|
||||
}
|
||||
if !valid {
|
||||
log.WithContext(ctx).Warnf("proxy %s token revoked or expired, disconnecting", conn.proxyID)
|
||||
conn.cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
@@ -232,8 +308,15 @@ func (s *ProxyServiceServer) heartbeat(ctx context.Context, proxyID string) {
|
||||
|
||||
// sendSnapshot sends the initial snapshot of services to the connecting proxy.
|
||||
// Only services matching the proxy's cluster address are sent.
|
||||
// For BYOD proxies (account-scoped), only account services are sent.
|
||||
func (s *ProxyServiceServer) sendSnapshot(ctx context.Context, conn *proxyConnection) error {
|
||||
services, err := s.serviceManager.GetGlobalServices(ctx)
|
||||
var services []*rpservice.Service
|
||||
var err error
|
||||
if conn.accountID != nil {
|
||||
services, err = s.serviceManager.GetAccountServices(ctx, *conn.accountID)
|
||||
} else {
|
||||
services, err = s.serviceManager.GetGlobalServices(ctx)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("get services from store: %w", err)
|
||||
}
|
||||
@@ -295,8 +378,14 @@ func (s *ProxyServiceServer) sendSnapshot(ctx context.Context, conn *proxyConnec
|
||||
return nil
|
||||
}
|
||||
|
||||
// isProxyAddressValid validates a proxy address
|
||||
// isProxyAddressValid validates a proxy address (domain name or IP address)
|
||||
func isProxyAddressValid(addr string) bool {
|
||||
if addr == "" {
|
||||
return false
|
||||
}
|
||||
if net.ParseIP(addr) != nil {
|
||||
return true
|
||||
}
|
||||
_, err := domain.ValidateDomains([]string{addr})
|
||||
return err == nil
|
||||
}
|
||||
@@ -320,6 +409,10 @@ func (s *ProxyServiceServer) sender(conn *proxyConnection, errChan chan<- error)
|
||||
func (s *ProxyServiceServer) SendAccessLog(ctx context.Context, req *proto.SendAccessLogRequest) (*proto.SendAccessLogResponse, error) {
|
||||
accessLog := req.GetLog()
|
||||
|
||||
if err := enforceAccountScope(ctx, accessLog.GetAccountId()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fields := log.Fields{
|
||||
"service_id": accessLog.GetServiceId(),
|
||||
"account_id": accessLog.GetAccountId(),
|
||||
@@ -357,10 +450,18 @@ func (s *ProxyServiceServer) SendAccessLog(ctx context.Context, req *proto.SendA
|
||||
// Management should call this when services are created/updated/removed.
|
||||
// For create/update operations a unique one-time auth token is generated per
|
||||
// proxy so that every replica can independently authenticate with management.
|
||||
// BYOD proxies only receive updates for their own account's services.
|
||||
func (s *ProxyServiceServer) SendServiceUpdate(update *proto.GetMappingUpdateResponse) {
|
||||
log.Debugf("Broadcasting service update to all connected proxy servers")
|
||||
var updateAccountID string
|
||||
if len(update.Mapping) > 0 {
|
||||
updateAccountID = update.Mapping[0].AccountId
|
||||
}
|
||||
s.connectedProxies.Range(func(key, value interface{}) bool {
|
||||
conn := value.(*proxyConnection)
|
||||
if conn.accountID != nil && updateAccountID != "" && *conn.accountID != updateAccountID {
|
||||
return true
|
||||
}
|
||||
msg := s.perProxyMessage(update, conn.proxyID)
|
||||
if msg == nil {
|
||||
return true
|
||||
@@ -375,6 +476,16 @@ func (s *ProxyServiceServer) SendServiceUpdate(update *proto.GetMappingUpdateRes
|
||||
})
|
||||
}
|
||||
|
||||
// ForceDisconnect cancels the gRPC stream for a connected proxy, causing it to disconnect.
|
||||
func (s *ProxyServiceServer) ForceDisconnect(proxyID string) {
|
||||
if connVal, ok := s.connectedProxies.Load(proxyID); ok {
|
||||
conn := connVal.(*proxyConnection)
|
||||
conn.cancel()
|
||||
s.connectedProxies.Delete(proxyID)
|
||||
log.WithFields(log.Fields{"proxyID": proxyID}).Info("force disconnected proxy")
|
||||
}
|
||||
}
|
||||
|
||||
// GetConnectedProxies returns a list of connected proxy IDs
|
||||
func (s *ProxyServiceServer) GetConnectedProxies() []string {
|
||||
var proxies []string
|
||||
@@ -440,6 +551,9 @@ func (s *ProxyServiceServer) SendServiceUpdateToCluster(ctx context.Context, upd
|
||||
for _, proxyID := range proxyIDs {
|
||||
if connVal, ok := s.connectedProxies.Load(proxyID); ok {
|
||||
conn := connVal.(*proxyConnection)
|
||||
if conn.accountID != nil && update.AccountId != "" && *conn.accountID != update.AccountId {
|
||||
continue
|
||||
}
|
||||
msg := s.perProxyMessage(updateResponse, proxyID)
|
||||
if msg == nil {
|
||||
continue
|
||||
@@ -499,6 +613,10 @@ func shallowCloneMapping(m *proto.ProxyMapping) *proto.ProxyMapping {
|
||||
}
|
||||
|
||||
func (s *ProxyServiceServer) Authenticate(ctx context.Context, req *proto.AuthenticateRequest) (*proto.AuthenticateResponse, error) {
|
||||
if err := enforceAccountScope(ctx, req.GetAccountId()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
service, err := s.serviceManager.GetServiceByID(ctx, req.GetAccountId(), req.GetId())
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Debugf("failed to get service from store: %v", err)
|
||||
@@ -587,6 +705,10 @@ func (s *ProxyServiceServer) generateSessionToken(ctx context.Context, authentic
|
||||
|
||||
// SendStatusUpdate handles status updates from proxy clients
|
||||
func (s *ProxyServiceServer) SendStatusUpdate(ctx context.Context, req *proto.SendStatusUpdateRequest) (*proto.SendStatusUpdateResponse, error) {
|
||||
if err := enforceAccountScope(ctx, req.GetAccountId()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
accountID := req.GetAccountId()
|
||||
serviceID := req.GetServiceId()
|
||||
protoStatus := req.GetStatus()
|
||||
@@ -653,6 +775,10 @@ func protoStatusToInternal(protoStatus proto.ProxyStatus) rpservice.Status {
|
||||
|
||||
// CreateProxyPeer handles proxy peer creation with one-time token authentication
|
||||
func (s *ProxyServiceServer) CreateProxyPeer(ctx context.Context, req *proto.CreateProxyPeerRequest) (*proto.CreateProxyPeerResponse, error) {
|
||||
if err := enforceAccountScope(ctx, req.GetAccountId()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
serviceID := req.GetServiceId()
|
||||
accountID := req.GetAccountId()
|
||||
token := req.GetToken()
|
||||
@@ -707,6 +833,10 @@ func strPtr(s string) *string {
|
||||
}
|
||||
|
||||
func (s *ProxyServiceServer) GetOIDCURL(ctx context.Context, req *proto.GetOIDCURLRequest) (*proto.GetOIDCURLResponse, error) {
|
||||
if err := enforceAccountScope(ctx, req.GetAccountId()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
redirectURL, err := url.Parse(req.GetRedirectUrl())
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "parse redirect url: %v", err)
|
||||
@@ -829,21 +959,9 @@ func (s *ProxyServiceServer) ValidateState(state string) (verifier, redirectURL
|
||||
|
||||
// GenerateSessionToken creates a signed session JWT for the given domain and user.
|
||||
func (s *ProxyServiceServer) GenerateSessionToken(ctx context.Context, domain, userID string, method proxyauth.Method) (string, error) {
|
||||
// Find the service by domain to get its signing key
|
||||
services, err := s.serviceManager.GetGlobalServices(ctx)
|
||||
service, err := s.getServiceByDomain(ctx, domain)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("get services: %w", err)
|
||||
}
|
||||
|
||||
var service *rpservice.Service
|
||||
for _, svc := range services {
|
||||
if svc.Domain == domain {
|
||||
service = svc
|
||||
break
|
||||
}
|
||||
}
|
||||
if service == nil {
|
||||
return "", fmt.Errorf("service not found for domain: %s", domain)
|
||||
return "", fmt.Errorf("service not found for domain %s: %w", domain, err)
|
||||
}
|
||||
|
||||
if service.SessionPrivateKey == "" {
|
||||
@@ -941,6 +1059,10 @@ func (s *ProxyServiceServer) ValidateSession(ctx context.Context, req *proto.Val
|
||||
}, nil
|
||||
}
|
||||
|
||||
if err := enforceAccountScope(ctx, service.AccountID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pubKeyBytes, err := base64.StdEncoding.DecodeString(service.SessionPublicKey)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
@@ -1024,18 +1146,7 @@ func (s *ProxyServiceServer) ValidateSession(ctx context.Context, req *proto.Val
|
||||
}
|
||||
|
||||
func (s *ProxyServiceServer) getServiceByDomain(ctx context.Context, domain string) (*rpservice.Service, error) {
|
||||
services, err := s.serviceManager.GetGlobalServices(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get services: %w", err)
|
||||
}
|
||||
|
||||
for _, service := range services {
|
||||
if service.Domain == domain {
|
||||
return service, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("service not found for domain: %s", domain)
|
||||
return s.serviceManager.GetServiceByDomain(ctx, domain)
|
||||
}
|
||||
|
||||
func (s *ProxyServiceServer) checkGroupAccess(service *rpservice.Service, user *types.User) error {
|
||||
|
||||
Reference in New Issue
Block a user