mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-23 02:36:42 +00:00
move to reverse proxy and update api
This commit is contained in:
@@ -6,18 +6,19 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/netbirdio/netbird/management/internals/modules/services"
|
||||
"github.com/netbirdio/netbird/management/server/store"
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
"github.com/netbirdio/netbird/shared/management/proto"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/netbirdio/netbird/management/internals/modules/reverseproxy"
|
||||
"github.com/netbirdio/netbird/management/server/store"
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
"github.com/netbirdio/netbird/shared/management/proto"
|
||||
)
|
||||
|
||||
type serviceStore interface {
|
||||
GetAccountServices(ctx context.Context, lockStrength store.LockingStrength, accountID string) ([]*services.Service, error)
|
||||
type reverseProxyStore interface {
|
||||
GetAccountReverseProxies(ctx context.Context, lockStrength store.LockingStrength, accountID string) ([]*reverseproxy.ReverseProxy, error)
|
||||
}
|
||||
|
||||
type keyStore interface {
|
||||
@@ -31,11 +32,11 @@ type ProxyServiceServer struct {
|
||||
// Map of connected proxies: proxy_id -> proxy connection
|
||||
connectedProxies sync.Map
|
||||
|
||||
// Channel for broadcasting service updates to all proxies
|
||||
// Channel for broadcasting reverse proxy updates to all proxies
|
||||
updatesChan chan *proto.ProxyMapping
|
||||
|
||||
// Store of services
|
||||
serviceStore serviceStore
|
||||
// Store of reverse proxies
|
||||
reverseProxyStore reverseProxyStore
|
||||
|
||||
// Store for client setup keys
|
||||
keyStore keyStore
|
||||
@@ -52,10 +53,10 @@ type proxyConnection struct {
|
||||
}
|
||||
|
||||
// NewProxyServiceServer creates a new proxy service server
|
||||
func NewProxyServiceServer(store serviceStore) *ProxyServiceServer {
|
||||
func NewProxyServiceServer(store reverseProxyStore) *ProxyServiceServer {
|
||||
return &ProxyServiceServer{
|
||||
updatesChan: make(chan *proto.ProxyMapping, 100),
|
||||
serviceStore: store,
|
||||
updatesChan: make(chan *proto.ProxyMapping, 100),
|
||||
reverseProxyStore: store,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,51 +111,51 @@ func (s *ProxyServiceServer) GetMappingUpdate(req *proto.GetMappingUpdateRequest
|
||||
}
|
||||
}
|
||||
|
||||
// sendSnapshot sends the initial snapshot of all services to proxy
|
||||
// sendSnapshot sends the initial snapshot of all reverse proxies to proxy
|
||||
func (s *ProxyServiceServer) sendSnapshot(ctx context.Context, conn *proxyConnection) error {
|
||||
svcs, err := s.serviceStore.GetAccountServices(ctx, store.LockingStrengthNone, conn.proxyID) // TODO: check locking strength and accountID.
|
||||
reverseProxies, err := s.reverseProxyStore.GetAccountReverseProxies(ctx, store.LockingStrengthNone, conn.proxyID) // TODO: check locking strength and accountID.
|
||||
if err != nil {
|
||||
// TODO: something
|
||||
return fmt.Errorf("get account services from store: %w", err)
|
||||
return fmt.Errorf("get account reverse proxies from store: %w", err)
|
||||
}
|
||||
|
||||
for _, svc := range svcs {
|
||||
if !svc.Enabled || !svc.Exposed {
|
||||
// We don't care about disabled services for snapshots.
|
||||
for _, rp := range reverseProxies {
|
||||
if !rp.Enabled {
|
||||
// We don't care about disabled reverse proxies for snapshots.
|
||||
continue
|
||||
}
|
||||
|
||||
// Fill auth values.
|
||||
// TODO: This will be removed soon as the management server should be handling authentication rather than the proxy, so probably not much use in fleshing this out too much.
|
||||
auth := &proto.Authentication{}
|
||||
if svc.AuthBearerEnabled {
|
||||
if rp.Auth.BearerAuth != nil && rp.Auth.BearerAuth.Enabled {
|
||||
auth.Oidc = &proto.OIDC{
|
||||
Enabled: true,
|
||||
// TODO: fill other OIDC fields from account OIDC settings.
|
||||
}
|
||||
}
|
||||
if svc.AuthBasicPassword != "" {
|
||||
if rp.Auth.PasswordAuth != nil && rp.Auth.PasswordAuth.Password != "" {
|
||||
auth.Basic = &proto.HTTPBasic{
|
||||
Enabled: true,
|
||||
Username: svc.AuthBasicUsername,
|
||||
Password: svc.AuthBasicPassword,
|
||||
Username: "",
|
||||
Password: rp.Auth.PasswordAuth.Password,
|
||||
}
|
||||
}
|
||||
if svc.AuthPINValue != "" {
|
||||
if rp.Auth.PinAuth != nil && rp.Auth.PinAuth.Pin != "" {
|
||||
auth.Pin = &proto.Pin{
|
||||
Enabled: true,
|
||||
Pin: svc.AuthPINValue,
|
||||
Pin: rp.Auth.PinAuth.Pin,
|
||||
}
|
||||
}
|
||||
|
||||
var paths []*proto.PathMapping
|
||||
for _, t := range svc.Targets {
|
||||
for _, t := range rp.Targets {
|
||||
if !t.Enabled {
|
||||
// We don't care about disabled service targets for snapshots.
|
||||
// We don't care about disabled reverse proxy targets for snapshots.
|
||||
continue
|
||||
}
|
||||
paths = append(paths, &proto.PathMapping{
|
||||
Path: t.Path,
|
||||
Path: *t.Path,
|
||||
Target: t.Host,
|
||||
})
|
||||
}
|
||||
@@ -179,8 +180,8 @@ func (s *ProxyServiceServer) sendSnapshot(ctx context.Context, conn *proxyConnec
|
||||
Mapping: []*proto.ProxyMapping{
|
||||
{
|
||||
Type: proto.ProxyMappingUpdateType_UPDATE_TYPE_CREATED, // Initial snapshot, all records are "new" for the proxy.
|
||||
Id: svc.ID,
|
||||
Domain: svc.Domain,
|
||||
Id: rp.ID,
|
||||
Domain: rp.Domain,
|
||||
Path: paths,
|
||||
SetupKey: key.Key,
|
||||
Auth: auth,
|
||||
@@ -214,34 +215,34 @@ func (s *ProxyServiceServer) sender(conn *proxyConnection, errChan chan<- error)
|
||||
// SendAccessLog processes access log from proxy
|
||||
func (s *ProxyServiceServer) SendAccessLog(ctx context.Context, req *proto.SendAccessLogRequest) (*proto.SendAccessLogResponse, error) {
|
||||
log.WithFields(log.Fields{
|
||||
"proxy_id": "", // TODO: get proxy id, probably from context or maybe from request message.
|
||||
"service_id": req.GetLog().GetServiceId(),
|
||||
"host": req.GetLog().GetHost(),
|
||||
"path": req.GetLog().GetPath(),
|
||||
"method": req.GetLog().GetMethod(),
|
||||
"response_code": req.GetLog().GetResponseCode(),
|
||||
"duration_ms": req.GetLog().GetDurationMs(),
|
||||
"source_ip": req.GetLog().GetSourceIp(),
|
||||
"auth_mechanism": req.GetLog().GetAuthMechanism(),
|
||||
"user_id": req.GetLog().GetUserId(),
|
||||
"auth_success": req.GetLog().GetAuthSuccess(),
|
||||
"proxy_id": "", // TODO: get proxy id, probably from context or maybe from request message.
|
||||
"reverse_proxy_id": req.GetLog().GetServiceId(),
|
||||
"host": req.GetLog().GetHost(),
|
||||
"path": req.GetLog().GetPath(),
|
||||
"method": req.GetLog().GetMethod(),
|
||||
"response_code": req.GetLog().GetResponseCode(),
|
||||
"duration_ms": req.GetLog().GetDurationMs(),
|
||||
"source_ip": req.GetLog().GetSourceIp(),
|
||||
"auth_mechanism": req.GetLog().GetAuthMechanism(),
|
||||
"user_id": req.GetLog().GetUserId(),
|
||||
"auth_success": req.GetLog().GetAuthSuccess(),
|
||||
}).Info("Access log from proxy")
|
||||
|
||||
// TODO: Store access log in database/metrics system
|
||||
return &proto.SendAccessLogResponse{}, nil
|
||||
}
|
||||
|
||||
// SendServiceUpdate broadcasts a service update to all connected proxies.
|
||||
// Management should call this when services are created/updated/removed
|
||||
func (s *ProxyServiceServer) SendServiceUpdate(update *proto.ProxyMapping) {
|
||||
// SendReverseProxyUpdate broadcasts a reverse proxy update to all connected proxies.
|
||||
// Management should call this when reverse proxies are created/updated/removed
|
||||
func (s *ProxyServiceServer) SendReverseProxyUpdate(update *proto.ProxyMapping) {
|
||||
// Send it to all connected proxies
|
||||
s.connectedProxies.Range(func(key, value interface{}) bool {
|
||||
conn := value.(*proxyConnection)
|
||||
select {
|
||||
case conn.sendChan <- update:
|
||||
log.Debugf("Sent service update to proxy %s", conn.proxyID)
|
||||
log.Debugf("Sent reverse proxy update to proxy %s", conn.proxyID)
|
||||
default:
|
||||
log.Warnf("Failed to send service update to proxy %s (channel full)", conn.proxyID)
|
||||
log.Warnf("Failed to send reverse proxy update to proxy %s (channel full)", conn.proxyID)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user