apply snapshot mappings concurrently while maintaining management backpressure

This commit is contained in:
pascal
2026-05-18 15:44:32 +02:00
parent 705f87fc20
commit ddc4c20a31
9 changed files with 2246 additions and 98 deletions

View File

@@ -136,9 +136,12 @@ type proxyConnection struct {
tokenID string
capabilities *proto.ProxyCapabilities
stream proto.ProxyService_GetMappingUpdateServer
sendChan chan *proto.GetMappingUpdateResponse
ctx context.Context
cancel context.CancelFunc
// syncStream is set when the proxy connected via SyncMappings.
// When non-nil, the sender goroutine uses this instead of stream.
syncStream proto.ProxyService_SyncMappingsServer
sendChan chan *proto.GetMappingUpdateResponse
ctx context.Context
cancel context.CancelFunc
}
func enforceAccountScope(ctx context.Context, requestAccountID string) error {
@@ -345,6 +348,223 @@ func (s *ProxyServiceServer) GetMappingUpdate(req *proto.GetMappingUpdateRequest
}
}
// SyncMappings implements the bidirectional SyncMappings RPC.
// It mirrors GetMappingUpdate but provides application-level back-pressure:
// management waits for an ack from the proxy before sending the next batch.
func (s *ProxyServiceServer) SyncMappings(stream proto.ProxyService_SyncMappingsServer) error {
ctx := stream.Context()
peerInfo := PeerIPFromContext(ctx)
log.Infof("New proxy SyncMappings connection from %s", peerInfo)
// First message must be init.
firstMsg, err := stream.Recv()
if err != nil {
return status.Errorf(codes.Internal, "receive init: %v", err)
}
init := firstMsg.GetInit()
if init == nil {
return status.Errorf(codes.InvalidArgument, "first message must be init")
}
proxyID := init.GetProxyId()
if proxyID == "" {
return status.Errorf(codes.InvalidArgument, "proxy_id is required")
}
proxyAddress := init.GetAddress()
if !isProxyAddressValid(proxyAddress) {
return status.Errorf(codes.InvalidArgument, "proxy address is invalid")
}
var accountID *string
token := GetProxyTokenFromContext(ctx)
if token != nil && token.AccountID != nil {
accountID = token.AccountID
available, err := s.proxyManager.IsClusterAddressAvailable(ctx, proxyAddress, *accountID)
if err != nil {
return status.Errorf(codes.Internal, "check cluster address: %v", err)
}
if !available {
return status.Errorf(codes.AlreadyExists, "cluster address %s is already in use", proxyAddress)
}
}
var tokenID string
if token != nil {
tokenID = token.ID
}
sessionID := uuid.NewString()
if old, loaded := s.connectedProxies.Load(proxyID); loaded {
oldConn := old.(*proxyConnection)
log.WithFields(log.Fields{
"proxy_id": proxyID,
"old_session_id": oldConn.sessionID,
"new_session_id": sessionID,
}).Info("Superseding existing proxy connection")
oldConn.cancel()
}
connCtx, cancel := context.WithCancel(ctx)
conn := &proxyConnection{
proxyID: proxyID,
sessionID: sessionID,
address: proxyAddress,
accountID: accountID,
tokenID: tokenID,
capabilities: init.GetCapabilities(),
syncStream: stream,
sendChan: make(chan *proto.GetMappingUpdateResponse, 100),
ctx: connCtx,
cancel: cancel,
}
var caps *proxy.Capabilities
if c := init.GetCapabilities(); c != nil {
caps = &proxy.Capabilities{
SupportsCustomPorts: c.SupportsCustomPorts,
RequireSubdomain: c.RequireSubdomain,
SupportsCrowdsec: c.SupportsCrowdsec,
}
}
proxyRecord, err := s.proxyManager.Connect(ctx, proxyID, sessionID, proxyAddress, peerInfo, accountID, caps)
if err != nil {
cancel()
if accountID != nil {
return status.Errorf(codes.Internal, "failed to register BYOP proxy: %v", err)
}
log.WithContext(ctx).Warnf("failed to register proxy %s in database: %v", proxyID, err)
return status.Errorf(codes.Internal, "register proxy in database: %v", err)
}
s.connectedProxies.Store(proxyID, conn)
if err := s.proxyController.RegisterProxyToCluster(ctx, conn.address, proxyID); err != nil {
log.WithContext(ctx).Warnf("Failed to register proxy %s in cluster: %v", proxyID, err)
}
if err := s.sendSnapshotSync(ctx, conn, stream); err != nil {
if s.connectedProxies.CompareAndDelete(proxyID, conn) {
if unregErr := s.proxyController.UnregisterProxyFromCluster(context.Background(), conn.address, proxyID); unregErr != nil {
log.WithContext(ctx).Debugf("cleanup after snapshot failure for proxy %s: %v", proxyID, unregErr)
}
}
cancel()
if disconnErr := s.proxyManager.Disconnect(context.Background(), proxyID, sessionID); disconnErr != nil {
log.WithContext(ctx).Debugf("cleanup after snapshot failure for proxy %s: %v", proxyID, disconnErr)
}
return fmt.Errorf("send snapshot to proxy %s: %w", proxyID, err)
}
errChan := make(chan error, 2)
go s.sender(conn, errChan)
// Drain acks from the proxy in the background so the stream stays healthy.
// After the snapshot phase, the proxy may still send acks for incremental
// updates; we simply discard them.
go func() {
for {
if _, err := stream.Recv(); err != nil {
errChan <- err
return
}
}
}()
log.WithFields(log.Fields{
"proxy_id": proxyID,
"session_id": sessionID,
"address": proxyAddress,
"cluster_addr": proxyAddress,
"account_id": accountID,
"total_proxies": len(s.GetConnectedProxies()),
}).Info("Proxy registered in cluster (SyncMappings)")
defer func() {
if !s.connectedProxies.CompareAndDelete(proxyID, conn) {
log.Infof("Proxy %s session %s: skipping cleanup, superseded by new connection", proxyID, sessionID)
cancel()
return
}
if err := s.proxyController.UnregisterProxyFromCluster(context.Background(), conn.address, proxyID); err != nil {
log.Warnf("Failed to unregister proxy %s from cluster: %v", proxyID, err)
}
if err := s.proxyManager.Disconnect(context.Background(), proxyID, sessionID); err != nil {
log.Warnf("Failed to mark proxy %s as disconnected: %v", proxyID, err)
}
cancel()
log.Infof("Proxy %s session %s disconnected", proxyID, sessionID)
}()
go s.heartbeat(connCtx, conn, proxyRecord)
select {
case err := <-errChan:
log.WithContext(ctx).Warnf("Failed to send update: %v", err)
return fmt.Errorf("send update to proxy %s: %w", proxyID, err)
case <-connCtx.Done():
log.WithContext(ctx).Infof("Proxy %s context canceled", proxyID)
return connCtx.Err()
}
}
// sendSnapshotSync sends the initial snapshot with back-pressure: it sends
// one batch, then waits for the proxy to ack before sending the next.
func (s *ProxyServiceServer) sendSnapshotSync(ctx context.Context, conn *proxyConnection, stream proto.ProxyService_SyncMappingsServer) error {
if !isProxyAddressValid(conn.address) {
return fmt.Errorf("proxy address is invalid")
}
mappings, err := s.snapshotServiceMappings(ctx, conn)
if err != nil {
return err
}
for i := 0; i < len(mappings); i += s.snapshotBatchSize {
end := i + s.snapshotBatchSize
if end > len(mappings) {
end = len(mappings)
}
for _, m := range mappings[i:end] {
token, err := s.tokenStore.GenerateToken(m.AccountId, m.Id, s.proxyTokenTTL())
if err != nil {
return fmt.Errorf("generate auth token for service %s: %w", m.Id, err)
}
m.AuthToken = token
}
if err := stream.Send(&proto.SyncMappingsResponse{
Mapping: mappings[i:end],
InitialSyncComplete: end == len(mappings),
}); err != nil {
return fmt.Errorf("send snapshot batch: %w", err)
}
// Wait for ack before sending the next batch.
if end < len(mappings) {
msg, err := stream.Recv()
if err != nil {
return fmt.Errorf("receive ack: %w", err)
}
if msg.GetAck() == nil {
return fmt.Errorf("expected ack, got %T", msg.GetMsg())
}
}
}
if len(mappings) == 0 {
if err := stream.Send(&proto.SyncMappingsResponse{
InitialSyncComplete: true,
}); err != nil {
return fmt.Errorf("send snapshot completion: %w", err)
}
}
return nil
}
// heartbeat updates the proxy's last_seen timestamp every minute and
// disconnects the proxy if its access token has been revoked.
func (s *ProxyServiceServer) heartbeat(ctx context.Context, conn *proxyConnection, p *proxy.Proxy) {
@@ -460,12 +680,14 @@ func isProxyAddressValid(addr string) bool {
return err == nil
}
// sender handles sending messages to proxy
// sender handles sending messages to proxy.
// When conn.syncStream is set the message is sent as SyncMappingsResponse;
// otherwise the legacy GetMappingUpdateResponse stream is used.
func (s *ProxyServiceServer) sender(conn *proxyConnection, errChan chan<- error) {
for {
select {
case resp := <-conn.sendChan:
if err := conn.stream.Send(resp); err != nil {
if err := conn.sendResponse(resp); err != nil {
errChan <- err
return
}
@@ -475,6 +697,17 @@ func (s *ProxyServiceServer) sender(conn *proxyConnection, errChan chan<- error)
}
}
// sendResponse sends a mapping update on whichever stream the proxy connected with.
func (conn *proxyConnection) sendResponse(resp *proto.GetMappingUpdateResponse) error {
if conn.syncStream != nil {
return conn.syncStream.Send(&proto.SyncMappingsResponse{
Mapping: resp.Mapping,
InitialSyncComplete: resp.InitialSyncComplete,
})
}
return conn.stream.Send(resp)
}
// SendAccessLog processes access log from proxy
func (s *ProxyServiceServer) SendAccessLog(ctx context.Context, req *proto.SendAccessLogRequest) (*proto.SendAccessLogResponse, error) {
accessLog := req.GetLog()
@@ -541,8 +774,8 @@ func (s *ProxyServiceServer) SendServiceUpdate(update *proto.GetMappingUpdateRes
return true
}
connUpdate = &proto.GetMappingUpdateResponse{
Mapping: filtered,
InitialSyncComplete: update.InitialSyncComplete,
Mapping: filtered,
InitialSyncComplete: update.InitialSyncComplete,
}
}
resp := s.perProxyMessage(connUpdate, conn.proxyID)

View File

@@ -0,0 +1,405 @@
package grpc
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
rpservice "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/service"
"github.com/netbirdio/netbird/shared/management/proto"
)
// syncRecordingStream is a mock ProxyService_SyncMappingsServer that records
// sent messages and returns pre-loaded ack responses from Recv.
type syncRecordingStream struct {
grpc.ServerStream
mu sync.Mutex
sent []*proto.SyncMappingsResponse
recvMsgs []*proto.SyncMappingsRequest
recvIdx int
}
func (s *syncRecordingStream) Send(m *proto.SyncMappingsResponse) error {
s.mu.Lock()
defer s.mu.Unlock()
s.sent = append(s.sent, m)
return nil
}
func (s *syncRecordingStream) Recv() (*proto.SyncMappingsRequest, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.recvIdx >= len(s.recvMsgs) {
return nil, fmt.Errorf("no more recv messages")
}
msg := s.recvMsgs[s.recvIdx]
s.recvIdx++
return msg, nil
}
func (s *syncRecordingStream) Context() context.Context { return context.Background() }
func (s *syncRecordingStream) SetHeader(metadata.MD) error { return nil }
func (s *syncRecordingStream) SendHeader(metadata.MD) error { return nil }
func (s *syncRecordingStream) SetTrailer(metadata.MD) {}
func (s *syncRecordingStream) SendMsg(any) error { return nil }
func (s *syncRecordingStream) RecvMsg(any) error { return nil }
func ackMsg() *proto.SyncMappingsRequest {
return &proto.SyncMappingsRequest{
Msg: &proto.SyncMappingsRequest_Ack{Ack: &proto.SyncMappingsAck{}},
}
}
func TestSendSnapshotSync_BatchesWithAcks(t *testing.T) {
const cluster = "cluster.example.com"
const batchSize = 3
const totalServices = 7 // 3 + 3 + 1 → 3 batches, 2 acks needed (not after last)
ctrl := gomock.NewController(t)
mgr := rpservice.NewMockManager(ctrl)
mgr.EXPECT().GetGlobalServices(gomock.Any()).Return(makeServices(totalServices, cluster), nil)
s := newSnapshotTestServer(t, batchSize)
s.serviceManager = mgr
// Provide 2 acks — one after each non-final batch.
stream := &syncRecordingStream{
recvMsgs: []*proto.SyncMappingsRequest{ackMsg(), ackMsg()},
}
conn := &proxyConnection{
proxyID: "proxy-a",
address: cluster,
syncStream: stream,
}
err := s.sendSnapshotSync(context.Background(), conn, stream)
require.NoError(t, err)
require.Len(t, stream.sent, 3, "should send ceil(7/3) = 3 batches")
assert.Len(t, stream.sent[0].Mapping, 3)
assert.False(t, stream.sent[0].InitialSyncComplete)
assert.Len(t, stream.sent[1].Mapping, 3)
assert.False(t, stream.sent[1].InitialSyncComplete)
assert.Len(t, stream.sent[2].Mapping, 1)
assert.True(t, stream.sent[2].InitialSyncComplete)
// All 2 acks consumed.
assert.Equal(t, 2, stream.recvIdx)
}
func TestSendSnapshotSync_SingleBatchNoAckNeeded(t *testing.T) {
const cluster = "cluster.example.com"
const batchSize = 100
const totalServices = 5
ctrl := gomock.NewController(t)
mgr := rpservice.NewMockManager(ctrl)
mgr.EXPECT().GetGlobalServices(gomock.Any()).Return(makeServices(totalServices, cluster), nil)
s := newSnapshotTestServer(t, batchSize)
s.serviceManager = mgr
// No acks needed — single batch is also the last.
stream := &syncRecordingStream{}
conn := &proxyConnection{
proxyID: "proxy-a",
address: cluster,
syncStream: stream,
}
err := s.sendSnapshotSync(context.Background(), conn, stream)
require.NoError(t, err)
require.Len(t, stream.sent, 1)
assert.Len(t, stream.sent[0].Mapping, totalServices)
assert.True(t, stream.sent[0].InitialSyncComplete)
assert.Equal(t, 0, stream.recvIdx, "no acks should be consumed for a single batch")
}
func TestSendSnapshotSync_EmptySnapshot(t *testing.T) {
const cluster = "cluster.example.com"
ctrl := gomock.NewController(t)
mgr := rpservice.NewMockManager(ctrl)
mgr.EXPECT().GetGlobalServices(gomock.Any()).Return(nil, nil)
s := newSnapshotTestServer(t, 500)
s.serviceManager = mgr
stream := &syncRecordingStream{}
conn := &proxyConnection{
proxyID: "proxy-a",
address: cluster,
syncStream: stream,
}
err := s.sendSnapshotSync(context.Background(), conn, stream)
require.NoError(t, err)
require.Len(t, stream.sent, 1, "empty snapshot must still send sync-complete")
assert.Empty(t, stream.sent[0].Mapping)
assert.True(t, stream.sent[0].InitialSyncComplete)
}
func TestSendSnapshotSync_MissingAckReturnsError(t *testing.T) {
const cluster = "cluster.example.com"
const batchSize = 2
const totalServices = 4 // 2 batches → 1 ack needed, but we provide none
ctrl := gomock.NewController(t)
mgr := rpservice.NewMockManager(ctrl)
mgr.EXPECT().GetGlobalServices(gomock.Any()).Return(makeServices(totalServices, cluster), nil)
s := newSnapshotTestServer(t, batchSize)
s.serviceManager = mgr
// No acks available — Recv will return error.
stream := &syncRecordingStream{}
conn := &proxyConnection{
proxyID: "proxy-a",
address: cluster,
syncStream: stream,
}
err := s.sendSnapshotSync(context.Background(), conn, stream)
require.Error(t, err)
assert.Contains(t, err.Error(), "receive ack")
// First batch should have been sent before the error.
require.Len(t, stream.sent, 1)
}
func TestSendSnapshotSync_WrongMessageInsteadOfAck(t *testing.T) {
const cluster = "cluster.example.com"
const batchSize = 2
const totalServices = 4
ctrl := gomock.NewController(t)
mgr := rpservice.NewMockManager(ctrl)
mgr.EXPECT().GetGlobalServices(gomock.Any()).Return(makeServices(totalServices, cluster), nil)
s := newSnapshotTestServer(t, batchSize)
s.serviceManager = mgr
// Send an init message instead of an ack.
stream := &syncRecordingStream{
recvMsgs: []*proto.SyncMappingsRequest{
{Msg: &proto.SyncMappingsRequest_Init{Init: &proto.SyncMappingsInit{ProxyId: "bad"}}},
},
}
conn := &proxyConnection{
proxyID: "proxy-a",
address: cluster,
syncStream: stream,
}
err := s.sendSnapshotSync(context.Background(), conn, stream)
require.Error(t, err)
assert.Contains(t, err.Error(), "expected ack")
}
func TestSendSnapshotSync_BackPressureOrdering(t *testing.T) {
// Verify batches are sent strictly sequentially — batch N+1 is not sent
// until the ack for batch N is received.
const cluster = "cluster.example.com"
const batchSize = 2
const totalServices = 6 // 3 batches, 2 acks
ctrl := gomock.NewController(t)
mgr := rpservice.NewMockManager(ctrl)
mgr.EXPECT().GetGlobalServices(gomock.Any()).Return(makeServices(totalServices, cluster), nil)
s := newSnapshotTestServer(t, batchSize)
s.serviceManager = mgr
var mu sync.Mutex
var events []string
// Build a stream that logs send/recv events so we can verify ordering.
ackCh := make(chan struct{}, 2)
stream := &orderTrackingStream{
mu: &mu,
events: &events,
ackCh: ackCh,
}
conn := &proxyConnection{
proxyID: "proxy-a",
address: cluster,
syncStream: stream,
}
// Feed acks asynchronously after a short delay to simulate real proxy.
go func() {
for range 2 {
time.Sleep(10 * time.Millisecond)
ackCh <- struct{}{}
}
}()
err := s.sendSnapshotSync(context.Background(), conn, stream)
require.NoError(t, err)
mu.Lock()
defer mu.Unlock()
// Expected: send, recv-ack, send, recv-ack, send (last batch, no ack needed).
require.Len(t, events, 5)
assert.Equal(t, "send", events[0])
assert.Equal(t, "recv", events[1])
assert.Equal(t, "send", events[2])
assert.Equal(t, "recv", events[3])
assert.Equal(t, "send", events[4])
}
// orderTrackingStream logs "send" and "recv" events and blocks Recv until
// an ack is signaled via ackCh.
type orderTrackingStream struct {
grpc.ServerStream
mu *sync.Mutex
events *[]string
ackCh chan struct{}
}
func (s *orderTrackingStream) Send(_ *proto.SyncMappingsResponse) error {
s.mu.Lock()
*s.events = append(*s.events, "send")
s.mu.Unlock()
return nil
}
func (s *orderTrackingStream) Recv() (*proto.SyncMappingsRequest, error) {
<-s.ackCh
s.mu.Lock()
*s.events = append(*s.events, "recv")
s.mu.Unlock()
return ackMsg(), nil
}
func (s *orderTrackingStream) Context() context.Context { return context.Background() }
func (s *orderTrackingStream) SetHeader(metadata.MD) error { return nil }
func (s *orderTrackingStream) SendHeader(metadata.MD) error { return nil }
func (s *orderTrackingStream) SetTrailer(metadata.MD) {}
func (s *orderTrackingStream) SendMsg(any) error { return nil }
func (s *orderTrackingStream) RecvMsg(any) error { return nil }
func TestSendSnapshotSync_TokensGeneratedPerBatch(t *testing.T) {
const cluster = "cluster.example.com"
const batchSize = 2
const totalServices = 4
const ttl = 100 * time.Millisecond
const ackDelay = 200 * time.Millisecond
ctrl := gomock.NewController(t)
mgr := rpservice.NewMockManager(ctrl)
mgr.EXPECT().GetGlobalServices(gomock.Any()).Return(makeServices(totalServices, cluster), nil)
s := newSnapshotTestServer(t, batchSize)
s.serviceManager = mgr
s.tokenTTL = ttl
// Build a stream that validates tokens immediately on Send, then
// delays the ack to ensure the next batch's tokens are generated fresh.
var validateErrs []error
ackCh := make(chan struct{}, 1)
stream := &tokenValidatingSyncStream{
tokenStore: s.tokenStore,
validateErrs: &validateErrs,
ackCh: ackCh,
}
conn := &proxyConnection{
proxyID: "proxy-a",
address: cluster,
syncStream: stream,
}
go func() {
// Delay ack so that if tokens were all generated upfront they'd expire.
time.Sleep(ackDelay)
ackCh <- struct{}{}
}()
err := s.sendSnapshotSync(context.Background(), conn, stream)
require.NoError(t, err)
require.Empty(t, validateErrs,
"tokens must remain valid: per-batch generation guarantees freshness")
}
type tokenValidatingSyncStream struct {
grpc.ServerStream
tokenStore *OneTimeTokenStore
validateErrs *[]error
ackCh chan struct{}
}
func (s *tokenValidatingSyncStream) Send(m *proto.SyncMappingsResponse) error {
for _, mapping := range m.Mapping {
if err := s.tokenStore.ValidateAndConsume(mapping.AuthToken, mapping.AccountId, mapping.Id); err != nil {
*s.validateErrs = append(*s.validateErrs, fmt.Errorf("svc %s: %w", mapping.Id, err))
}
}
return nil
}
func (s *tokenValidatingSyncStream) Recv() (*proto.SyncMappingsRequest, error) {
<-s.ackCh
return ackMsg(), nil
}
func (s *tokenValidatingSyncStream) Context() context.Context { return context.Background() }
func (s *tokenValidatingSyncStream) SetHeader(metadata.MD) error { return nil }
func (s *tokenValidatingSyncStream) SendHeader(metadata.MD) error { return nil }
func (s *tokenValidatingSyncStream) SetTrailer(metadata.MD) {}
func (s *tokenValidatingSyncStream) SendMsg(any) error { return nil }
func (s *tokenValidatingSyncStream) RecvMsg(any) error { return nil }
func TestConnectionSendResponse_RoutesToSyncStream(t *testing.T) {
stream := &syncRecordingStream{}
conn := &proxyConnection{
syncStream: stream,
}
resp := &proto.GetMappingUpdateResponse{
Mapping: []*proto.ProxyMapping{
{Id: "svc-1", AccountId: "acct-1", Domain: "example.com"},
},
InitialSyncComplete: true,
}
err := conn.sendResponse(resp)
require.NoError(t, err)
require.Len(t, stream.sent, 1)
assert.Len(t, stream.sent[0].Mapping, 1)
assert.Equal(t, "svc-1", stream.sent[0].Mapping[0].Id)
assert.True(t, stream.sent[0].InitialSyncComplete)
}
func TestConnectionSendResponse_RoutesToLegacyStream(t *testing.T) {
stream := &recordingStream{}
conn := &proxyConnection{
stream: stream,
}
resp := &proto.GetMappingUpdateResponse{
Mapping: []*proto.ProxyMapping{
{Id: "svc-2", AccountId: "acct-2"},
},
}
err := conn.sendResponse(resp)
require.NoError(t, err)
require.Len(t, stream.messages, 1)
assert.Equal(t, "svc-2", stream.messages[0].Mapping[0].Id)
}