From ddc4c20a31b1845ac6e18efb8d4843dbe3456151 Mon Sep 17 00:00:00 2001 From: pascal Date: Mon, 18 May 2026 15:44:32 +0200 Subject: [PATCH] apply snapshot mappings concurrently while maintaining management backpressure --- management/internals/shared/grpc/proxy.go | 247 ++++++++- .../shared/grpc/sync_mappings_test.go | 405 ++++++++++++++ proxy/internal/roundtrip/netbird.go | 53 +- proxy/process_mappings_bench_test.go | 300 +++++++++++ proxy/server.go | 217 +++++++- proxy/sync_mappings_test.go | 510 ++++++++++++++++++ shared/management/proto/proxy_service.pb.go | 489 ++++++++++++++--- shared/management/proto/proxy_service.proto | 41 ++ .../management/proto/proxy_service_grpc.pb.go | 82 +++ 9 files changed, 2246 insertions(+), 98 deletions(-) create mode 100644 management/internals/shared/grpc/sync_mappings_test.go create mode 100644 proxy/process_mappings_bench_test.go create mode 100644 proxy/sync_mappings_test.go diff --git a/management/internals/shared/grpc/proxy.go b/management/internals/shared/grpc/proxy.go index eada2d86a..cd3d517ca 100644 --- a/management/internals/shared/grpc/proxy.go +++ b/management/internals/shared/grpc/proxy.go @@ -136,9 +136,12 @@ type proxyConnection struct { tokenID string capabilities *proto.ProxyCapabilities stream proto.ProxyService_GetMappingUpdateServer - sendChan chan *proto.GetMappingUpdateResponse - ctx context.Context - cancel context.CancelFunc + // syncStream is set when the proxy connected via SyncMappings. + // When non-nil, the sender goroutine uses this instead of stream. + syncStream proto.ProxyService_SyncMappingsServer + sendChan chan *proto.GetMappingUpdateResponse + ctx context.Context + cancel context.CancelFunc } func enforceAccountScope(ctx context.Context, requestAccountID string) error { @@ -345,6 +348,223 @@ func (s *ProxyServiceServer) GetMappingUpdate(req *proto.GetMappingUpdateRequest } } +// SyncMappings implements the bidirectional SyncMappings RPC. +// It mirrors GetMappingUpdate but provides application-level back-pressure: +// management waits for an ack from the proxy before sending the next batch. +func (s *ProxyServiceServer) SyncMappings(stream proto.ProxyService_SyncMappingsServer) error { + ctx := stream.Context() + + peerInfo := PeerIPFromContext(ctx) + log.Infof("New proxy SyncMappings connection from %s", peerInfo) + + // First message must be init. + firstMsg, err := stream.Recv() + if err != nil { + return status.Errorf(codes.Internal, "receive init: %v", err) + } + init := firstMsg.GetInit() + if init == nil { + return status.Errorf(codes.InvalidArgument, "first message must be init") + } + + proxyID := init.GetProxyId() + if proxyID == "" { + return status.Errorf(codes.InvalidArgument, "proxy_id is required") + } + + proxyAddress := init.GetAddress() + if !isProxyAddressValid(proxyAddress) { + return status.Errorf(codes.InvalidArgument, "proxy address is invalid") + } + + var accountID *string + token := GetProxyTokenFromContext(ctx) + if token != nil && token.AccountID != nil { + accountID = token.AccountID + + available, err := s.proxyManager.IsClusterAddressAvailable(ctx, proxyAddress, *accountID) + if err != nil { + return status.Errorf(codes.Internal, "check cluster address: %v", err) + } + if !available { + return status.Errorf(codes.AlreadyExists, "cluster address %s is already in use", proxyAddress) + } + } + + var tokenID string + if token != nil { + tokenID = token.ID + } + + sessionID := uuid.NewString() + + if old, loaded := s.connectedProxies.Load(proxyID); loaded { + oldConn := old.(*proxyConnection) + log.WithFields(log.Fields{ + "proxy_id": proxyID, + "old_session_id": oldConn.sessionID, + "new_session_id": sessionID, + }).Info("Superseding existing proxy connection") + oldConn.cancel() + } + + connCtx, cancel := context.WithCancel(ctx) + conn := &proxyConnection{ + proxyID: proxyID, + sessionID: sessionID, + address: proxyAddress, + accountID: accountID, + tokenID: tokenID, + capabilities: init.GetCapabilities(), + syncStream: stream, + sendChan: make(chan *proto.GetMappingUpdateResponse, 100), + ctx: connCtx, + cancel: cancel, + } + + var caps *proxy.Capabilities + if c := init.GetCapabilities(); c != nil { + caps = &proxy.Capabilities{ + SupportsCustomPorts: c.SupportsCustomPorts, + RequireSubdomain: c.RequireSubdomain, + SupportsCrowdsec: c.SupportsCrowdsec, + } + } + proxyRecord, err := s.proxyManager.Connect(ctx, proxyID, sessionID, proxyAddress, peerInfo, accountID, caps) + if err != nil { + cancel() + if accountID != nil { + return status.Errorf(codes.Internal, "failed to register BYOP proxy: %v", err) + } + log.WithContext(ctx).Warnf("failed to register proxy %s in database: %v", proxyID, err) + return status.Errorf(codes.Internal, "register proxy in database: %v", err) + } + + s.connectedProxies.Store(proxyID, conn) + if err := s.proxyController.RegisterProxyToCluster(ctx, conn.address, proxyID); err != nil { + log.WithContext(ctx).Warnf("Failed to register proxy %s in cluster: %v", proxyID, err) + } + + if err := s.sendSnapshotSync(ctx, conn, stream); err != nil { + if s.connectedProxies.CompareAndDelete(proxyID, conn) { + if unregErr := s.proxyController.UnregisterProxyFromCluster(context.Background(), conn.address, proxyID); unregErr != nil { + log.WithContext(ctx).Debugf("cleanup after snapshot failure for proxy %s: %v", proxyID, unregErr) + } + } + cancel() + if disconnErr := s.proxyManager.Disconnect(context.Background(), proxyID, sessionID); disconnErr != nil { + log.WithContext(ctx).Debugf("cleanup after snapshot failure for proxy %s: %v", proxyID, disconnErr) + } + return fmt.Errorf("send snapshot to proxy %s: %w", proxyID, err) + } + + errChan := make(chan error, 2) + go s.sender(conn, errChan) + + // Drain acks from the proxy in the background so the stream stays healthy. + // After the snapshot phase, the proxy may still send acks for incremental + // updates; we simply discard them. + go func() { + for { + if _, err := stream.Recv(); err != nil { + errChan <- err + return + } + } + }() + + log.WithFields(log.Fields{ + "proxy_id": proxyID, + "session_id": sessionID, + "address": proxyAddress, + "cluster_addr": proxyAddress, + "account_id": accountID, + "total_proxies": len(s.GetConnectedProxies()), + }).Info("Proxy registered in cluster (SyncMappings)") + defer func() { + if !s.connectedProxies.CompareAndDelete(proxyID, conn) { + log.Infof("Proxy %s session %s: skipping cleanup, superseded by new connection", proxyID, sessionID) + cancel() + return + } + + if err := s.proxyController.UnregisterProxyFromCluster(context.Background(), conn.address, proxyID); err != nil { + log.Warnf("Failed to unregister proxy %s from cluster: %v", proxyID, err) + } + if err := s.proxyManager.Disconnect(context.Background(), proxyID, sessionID); err != nil { + log.Warnf("Failed to mark proxy %s as disconnected: %v", proxyID, err) + } + + cancel() + log.Infof("Proxy %s session %s disconnected", proxyID, sessionID) + }() + + go s.heartbeat(connCtx, conn, proxyRecord) + + select { + case err := <-errChan: + log.WithContext(ctx).Warnf("Failed to send update: %v", err) + return fmt.Errorf("send update to proxy %s: %w", proxyID, err) + case <-connCtx.Done(): + log.WithContext(ctx).Infof("Proxy %s context canceled", proxyID) + return connCtx.Err() + } +} + +// sendSnapshotSync sends the initial snapshot with back-pressure: it sends +// one batch, then waits for the proxy to ack before sending the next. +func (s *ProxyServiceServer) sendSnapshotSync(ctx context.Context, conn *proxyConnection, stream proto.ProxyService_SyncMappingsServer) error { + if !isProxyAddressValid(conn.address) { + return fmt.Errorf("proxy address is invalid") + } + + mappings, err := s.snapshotServiceMappings(ctx, conn) + if err != nil { + return err + } + + for i := 0; i < len(mappings); i += s.snapshotBatchSize { + end := i + s.snapshotBatchSize + if end > len(mappings) { + end = len(mappings) + } + for _, m := range mappings[i:end] { + token, err := s.tokenStore.GenerateToken(m.AccountId, m.Id, s.proxyTokenTTL()) + if err != nil { + return fmt.Errorf("generate auth token for service %s: %w", m.Id, err) + } + m.AuthToken = token + } + if err := stream.Send(&proto.SyncMappingsResponse{ + Mapping: mappings[i:end], + InitialSyncComplete: end == len(mappings), + }); err != nil { + return fmt.Errorf("send snapshot batch: %w", err) + } + + // Wait for ack before sending the next batch. + if end < len(mappings) { + msg, err := stream.Recv() + if err != nil { + return fmt.Errorf("receive ack: %w", err) + } + if msg.GetAck() == nil { + return fmt.Errorf("expected ack, got %T", msg.GetMsg()) + } + } + } + + if len(mappings) == 0 { + if err := stream.Send(&proto.SyncMappingsResponse{ + InitialSyncComplete: true, + }); err != nil { + return fmt.Errorf("send snapshot completion: %w", err) + } + } + + return nil +} + // heartbeat updates the proxy's last_seen timestamp every minute and // disconnects the proxy if its access token has been revoked. func (s *ProxyServiceServer) heartbeat(ctx context.Context, conn *proxyConnection, p *proxy.Proxy) { @@ -460,12 +680,14 @@ func isProxyAddressValid(addr string) bool { return err == nil } -// sender handles sending messages to proxy +// sender handles sending messages to proxy. +// When conn.syncStream is set the message is sent as SyncMappingsResponse; +// otherwise the legacy GetMappingUpdateResponse stream is used. func (s *ProxyServiceServer) sender(conn *proxyConnection, errChan chan<- error) { for { select { case resp := <-conn.sendChan: - if err := conn.stream.Send(resp); err != nil { + if err := conn.sendResponse(resp); err != nil { errChan <- err return } @@ -475,6 +697,17 @@ func (s *ProxyServiceServer) sender(conn *proxyConnection, errChan chan<- error) } } +// sendResponse sends a mapping update on whichever stream the proxy connected with. +func (conn *proxyConnection) sendResponse(resp *proto.GetMappingUpdateResponse) error { + if conn.syncStream != nil { + return conn.syncStream.Send(&proto.SyncMappingsResponse{ + Mapping: resp.Mapping, + InitialSyncComplete: resp.InitialSyncComplete, + }) + } + return conn.stream.Send(resp) +} + // SendAccessLog processes access log from proxy func (s *ProxyServiceServer) SendAccessLog(ctx context.Context, req *proto.SendAccessLogRequest) (*proto.SendAccessLogResponse, error) { accessLog := req.GetLog() @@ -541,8 +774,8 @@ func (s *ProxyServiceServer) SendServiceUpdate(update *proto.GetMappingUpdateRes return true } connUpdate = &proto.GetMappingUpdateResponse{ - Mapping: filtered, - InitialSyncComplete: update.InitialSyncComplete, + Mapping: filtered, + InitialSyncComplete: update.InitialSyncComplete, } } resp := s.perProxyMessage(connUpdate, conn.proxyID) diff --git a/management/internals/shared/grpc/sync_mappings_test.go b/management/internals/shared/grpc/sync_mappings_test.go new file mode 100644 index 000000000..f8361753c --- /dev/null +++ b/management/internals/shared/grpc/sync_mappings_test.go @@ -0,0 +1,405 @@ +package grpc + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + + rpservice "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/service" + "github.com/netbirdio/netbird/shared/management/proto" +) + +// syncRecordingStream is a mock ProxyService_SyncMappingsServer that records +// sent messages and returns pre-loaded ack responses from Recv. +type syncRecordingStream struct { + grpc.ServerStream + + mu sync.Mutex + sent []*proto.SyncMappingsResponse + recvMsgs []*proto.SyncMappingsRequest + recvIdx int +} + +func (s *syncRecordingStream) Send(m *proto.SyncMappingsResponse) error { + s.mu.Lock() + defer s.mu.Unlock() + s.sent = append(s.sent, m) + return nil +} + +func (s *syncRecordingStream) Recv() (*proto.SyncMappingsRequest, error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.recvIdx >= len(s.recvMsgs) { + return nil, fmt.Errorf("no more recv messages") + } + msg := s.recvMsgs[s.recvIdx] + s.recvIdx++ + return msg, nil +} + +func (s *syncRecordingStream) Context() context.Context { return context.Background() } +func (s *syncRecordingStream) SetHeader(metadata.MD) error { return nil } +func (s *syncRecordingStream) SendHeader(metadata.MD) error { return nil } +func (s *syncRecordingStream) SetTrailer(metadata.MD) {} +func (s *syncRecordingStream) SendMsg(any) error { return nil } +func (s *syncRecordingStream) RecvMsg(any) error { return nil } + +func ackMsg() *proto.SyncMappingsRequest { + return &proto.SyncMappingsRequest{ + Msg: &proto.SyncMappingsRequest_Ack{Ack: &proto.SyncMappingsAck{}}, + } +} + +func TestSendSnapshotSync_BatchesWithAcks(t *testing.T) { + const cluster = "cluster.example.com" + const batchSize = 3 + const totalServices = 7 // 3 + 3 + 1 → 3 batches, 2 acks needed (not after last) + + ctrl := gomock.NewController(t) + mgr := rpservice.NewMockManager(ctrl) + mgr.EXPECT().GetGlobalServices(gomock.Any()).Return(makeServices(totalServices, cluster), nil) + + s := newSnapshotTestServer(t, batchSize) + s.serviceManager = mgr + + // Provide 2 acks — one after each non-final batch. + stream := &syncRecordingStream{ + recvMsgs: []*proto.SyncMappingsRequest{ackMsg(), ackMsg()}, + } + conn := &proxyConnection{ + proxyID: "proxy-a", + address: cluster, + syncStream: stream, + } + + err := s.sendSnapshotSync(context.Background(), conn, stream) + require.NoError(t, err) + + require.Len(t, stream.sent, 3, "should send ceil(7/3) = 3 batches") + + assert.Len(t, stream.sent[0].Mapping, 3) + assert.False(t, stream.sent[0].InitialSyncComplete) + + assert.Len(t, stream.sent[1].Mapping, 3) + assert.False(t, stream.sent[1].InitialSyncComplete) + + assert.Len(t, stream.sent[2].Mapping, 1) + assert.True(t, stream.sent[2].InitialSyncComplete) + + // All 2 acks consumed. + assert.Equal(t, 2, stream.recvIdx) +} + +func TestSendSnapshotSync_SingleBatchNoAckNeeded(t *testing.T) { + const cluster = "cluster.example.com" + const batchSize = 100 + const totalServices = 5 + + ctrl := gomock.NewController(t) + mgr := rpservice.NewMockManager(ctrl) + mgr.EXPECT().GetGlobalServices(gomock.Any()).Return(makeServices(totalServices, cluster), nil) + + s := newSnapshotTestServer(t, batchSize) + s.serviceManager = mgr + + // No acks needed — single batch is also the last. + stream := &syncRecordingStream{} + conn := &proxyConnection{ + proxyID: "proxy-a", + address: cluster, + syncStream: stream, + } + + err := s.sendSnapshotSync(context.Background(), conn, stream) + require.NoError(t, err) + + require.Len(t, stream.sent, 1) + assert.Len(t, stream.sent[0].Mapping, totalServices) + assert.True(t, stream.sent[0].InitialSyncComplete) + assert.Equal(t, 0, stream.recvIdx, "no acks should be consumed for a single batch") +} + +func TestSendSnapshotSync_EmptySnapshot(t *testing.T) { + const cluster = "cluster.example.com" + + ctrl := gomock.NewController(t) + mgr := rpservice.NewMockManager(ctrl) + mgr.EXPECT().GetGlobalServices(gomock.Any()).Return(nil, nil) + + s := newSnapshotTestServer(t, 500) + s.serviceManager = mgr + + stream := &syncRecordingStream{} + conn := &proxyConnection{ + proxyID: "proxy-a", + address: cluster, + syncStream: stream, + } + + err := s.sendSnapshotSync(context.Background(), conn, stream) + require.NoError(t, err) + + require.Len(t, stream.sent, 1, "empty snapshot must still send sync-complete") + assert.Empty(t, stream.sent[0].Mapping) + assert.True(t, stream.sent[0].InitialSyncComplete) +} + +func TestSendSnapshotSync_MissingAckReturnsError(t *testing.T) { + const cluster = "cluster.example.com" + const batchSize = 2 + const totalServices = 4 // 2 batches → 1 ack needed, but we provide none + + ctrl := gomock.NewController(t) + mgr := rpservice.NewMockManager(ctrl) + mgr.EXPECT().GetGlobalServices(gomock.Any()).Return(makeServices(totalServices, cluster), nil) + + s := newSnapshotTestServer(t, batchSize) + s.serviceManager = mgr + + // No acks available — Recv will return error. + stream := &syncRecordingStream{} + conn := &proxyConnection{ + proxyID: "proxy-a", + address: cluster, + syncStream: stream, + } + + err := s.sendSnapshotSync(context.Background(), conn, stream) + require.Error(t, err) + assert.Contains(t, err.Error(), "receive ack") + // First batch should have been sent before the error. + require.Len(t, stream.sent, 1) +} + +func TestSendSnapshotSync_WrongMessageInsteadOfAck(t *testing.T) { + const cluster = "cluster.example.com" + const batchSize = 2 + const totalServices = 4 + + ctrl := gomock.NewController(t) + mgr := rpservice.NewMockManager(ctrl) + mgr.EXPECT().GetGlobalServices(gomock.Any()).Return(makeServices(totalServices, cluster), nil) + + s := newSnapshotTestServer(t, batchSize) + s.serviceManager = mgr + + // Send an init message instead of an ack. + stream := &syncRecordingStream{ + recvMsgs: []*proto.SyncMappingsRequest{ + {Msg: &proto.SyncMappingsRequest_Init{Init: &proto.SyncMappingsInit{ProxyId: "bad"}}}, + }, + } + conn := &proxyConnection{ + proxyID: "proxy-a", + address: cluster, + syncStream: stream, + } + + err := s.sendSnapshotSync(context.Background(), conn, stream) + require.Error(t, err) + assert.Contains(t, err.Error(), "expected ack") +} + +func TestSendSnapshotSync_BackPressureOrdering(t *testing.T) { + // Verify batches are sent strictly sequentially — batch N+1 is not sent + // until the ack for batch N is received. + const cluster = "cluster.example.com" + const batchSize = 2 + const totalServices = 6 // 3 batches, 2 acks + + ctrl := gomock.NewController(t) + mgr := rpservice.NewMockManager(ctrl) + mgr.EXPECT().GetGlobalServices(gomock.Any()).Return(makeServices(totalServices, cluster), nil) + + s := newSnapshotTestServer(t, batchSize) + s.serviceManager = mgr + + var mu sync.Mutex + var events []string + + // Build a stream that logs send/recv events so we can verify ordering. + ackCh := make(chan struct{}, 2) + stream := &orderTrackingStream{ + mu: &mu, + events: &events, + ackCh: ackCh, + } + conn := &proxyConnection{ + proxyID: "proxy-a", + address: cluster, + syncStream: stream, + } + + // Feed acks asynchronously after a short delay to simulate real proxy. + go func() { + for range 2 { + time.Sleep(10 * time.Millisecond) + ackCh <- struct{}{} + } + }() + + err := s.sendSnapshotSync(context.Background(), conn, stream) + require.NoError(t, err) + + mu.Lock() + defer mu.Unlock() + + // Expected: send, recv-ack, send, recv-ack, send (last batch, no ack needed). + require.Len(t, events, 5) + assert.Equal(t, "send", events[0]) + assert.Equal(t, "recv", events[1]) + assert.Equal(t, "send", events[2]) + assert.Equal(t, "recv", events[3]) + assert.Equal(t, "send", events[4]) +} + +// orderTrackingStream logs "send" and "recv" events and blocks Recv until +// an ack is signaled via ackCh. +type orderTrackingStream struct { + grpc.ServerStream + mu *sync.Mutex + events *[]string + ackCh chan struct{} +} + +func (s *orderTrackingStream) Send(_ *proto.SyncMappingsResponse) error { + s.mu.Lock() + *s.events = append(*s.events, "send") + s.mu.Unlock() + return nil +} + +func (s *orderTrackingStream) Recv() (*proto.SyncMappingsRequest, error) { + <-s.ackCh + s.mu.Lock() + *s.events = append(*s.events, "recv") + s.mu.Unlock() + return ackMsg(), nil +} + +func (s *orderTrackingStream) Context() context.Context { return context.Background() } +func (s *orderTrackingStream) SetHeader(metadata.MD) error { return nil } +func (s *orderTrackingStream) SendHeader(metadata.MD) error { return nil } +func (s *orderTrackingStream) SetTrailer(metadata.MD) {} +func (s *orderTrackingStream) SendMsg(any) error { return nil } +func (s *orderTrackingStream) RecvMsg(any) error { return nil } + +func TestSendSnapshotSync_TokensGeneratedPerBatch(t *testing.T) { + const cluster = "cluster.example.com" + const batchSize = 2 + const totalServices = 4 + const ttl = 100 * time.Millisecond + const ackDelay = 200 * time.Millisecond + + ctrl := gomock.NewController(t) + mgr := rpservice.NewMockManager(ctrl) + mgr.EXPECT().GetGlobalServices(gomock.Any()).Return(makeServices(totalServices, cluster), nil) + + s := newSnapshotTestServer(t, batchSize) + s.serviceManager = mgr + s.tokenTTL = ttl + + // Build a stream that validates tokens immediately on Send, then + // delays the ack to ensure the next batch's tokens are generated fresh. + var validateErrs []error + ackCh := make(chan struct{}, 1) + stream := &tokenValidatingSyncStream{ + tokenStore: s.tokenStore, + validateErrs: &validateErrs, + ackCh: ackCh, + } + conn := &proxyConnection{ + proxyID: "proxy-a", + address: cluster, + syncStream: stream, + } + + go func() { + // Delay ack so that if tokens were all generated upfront they'd expire. + time.Sleep(ackDelay) + ackCh <- struct{}{} + }() + + err := s.sendSnapshotSync(context.Background(), conn, stream) + require.NoError(t, err) + require.Empty(t, validateErrs, + "tokens must remain valid: per-batch generation guarantees freshness") +} + +type tokenValidatingSyncStream struct { + grpc.ServerStream + tokenStore *OneTimeTokenStore + validateErrs *[]error + ackCh chan struct{} +} + +func (s *tokenValidatingSyncStream) Send(m *proto.SyncMappingsResponse) error { + for _, mapping := range m.Mapping { + if err := s.tokenStore.ValidateAndConsume(mapping.AuthToken, mapping.AccountId, mapping.Id); err != nil { + *s.validateErrs = append(*s.validateErrs, fmt.Errorf("svc %s: %w", mapping.Id, err)) + } + } + return nil +} + +func (s *tokenValidatingSyncStream) Recv() (*proto.SyncMappingsRequest, error) { + <-s.ackCh + return ackMsg(), nil +} + +func (s *tokenValidatingSyncStream) Context() context.Context { return context.Background() } +func (s *tokenValidatingSyncStream) SetHeader(metadata.MD) error { return nil } +func (s *tokenValidatingSyncStream) SendHeader(metadata.MD) error { return nil } +func (s *tokenValidatingSyncStream) SetTrailer(metadata.MD) {} +func (s *tokenValidatingSyncStream) SendMsg(any) error { return nil } +func (s *tokenValidatingSyncStream) RecvMsg(any) error { return nil } + +func TestConnectionSendResponse_RoutesToSyncStream(t *testing.T) { + stream := &syncRecordingStream{} + conn := &proxyConnection{ + syncStream: stream, + } + + resp := &proto.GetMappingUpdateResponse{ + Mapping: []*proto.ProxyMapping{ + {Id: "svc-1", AccountId: "acct-1", Domain: "example.com"}, + }, + InitialSyncComplete: true, + } + + err := conn.sendResponse(resp) + require.NoError(t, err) + + require.Len(t, stream.sent, 1) + assert.Len(t, stream.sent[0].Mapping, 1) + assert.Equal(t, "svc-1", stream.sent[0].Mapping[0].Id) + assert.True(t, stream.sent[0].InitialSyncComplete) +} + +func TestConnectionSendResponse_RoutesToLegacyStream(t *testing.T) { + stream := &recordingStream{} + conn := &proxyConnection{ + stream: stream, + } + + resp := &proto.GetMappingUpdateResponse{ + Mapping: []*proto.ProxyMapping{ + {Id: "svc-2", AccountId: "acct-2"}, + }, + } + + err := conn.sendResponse(resp) + require.NoError(t, err) + + require.Len(t, stream.messages, 1) + assert.Equal(t, "svc-2", stream.messages[0].Mapping[0].Id) +} diff --git a/proxy/internal/roundtrip/netbird.go b/proxy/internal/roundtrip/netbird.go index e38e3dc4e..ce9e8ddf3 100644 --- a/proxy/internal/roundtrip/netbird.go +++ b/proxy/internal/roundtrip/netbird.go @@ -76,6 +76,11 @@ type clientEntry struct { services map[ServiceKey]serviceInfo createdAt time.Time started bool + // ready is closed once the client has been fully initialized. + // Callers that find a pending entry wait on this channel before + // accessing the client. A nil initErr means success. + ready chan struct{} + initErr error // Per-backend in-flight limiting keyed by target host:port. // TODO: clean up stale entries when backend targets change. inflightMu sync.Mutex @@ -157,6 +162,9 @@ type skipTLSVerifyContextKey struct{} // AddPeer registers a service for an account. If the account doesn't have a client yet, // one is created by authenticating with the management server using the provided token. // Multiple services can share the same client. +// +// Client creation (WG keygen, gRPC, embed.New) runs without holding clientsMux +// so that concurrent AddPeer calls for different accounts execute in parallel. func (n *NetBird) AddPeer(ctx context.Context, accountID types.AccountID, key ServiceKey, authToken string, serviceID types.ServiceID) error { si := serviceInfo{serviceID: serviceID} @@ -164,10 +172,23 @@ func (n *NetBird) AddPeer(ctx context.Context, accountID types.AccountID, key Se entry, exists := n.clients[accountID] if exists { + ready := entry.ready entry.services[key] = si started := entry.started n.clientsMux.Unlock() + // If the entry is still being initialized by another goroutine, wait. + if ready != nil { + select { + case <-ready: + case <-ctx.Done(): + return ctx.Err() + } + if entry.initErr != nil { + return fmt.Errorf("peer initialization failed: %w", entry.initErr) + } + } + n.logger.WithFields(log.Fields{ "account_id": accountID, "service_key": key, @@ -184,15 +205,39 @@ func (n *NetBird) AddPeer(ctx context.Context, accountID types.AccountID, key Se return nil } - entry, err := n.createClientEntry(ctx, accountID, key, authToken, si) + // Insert a placeholder so other goroutines calling AddPeer for the same + // account will wait on the ready channel instead of starting a second + // client creation. + entry = &clientEntry{ + services: map[ServiceKey]serviceInfo{key: si}, + ready: make(chan struct{}), + } + n.clients[accountID] = entry + n.clientsMux.Unlock() + + created, err := n.createClientEntry(ctx, accountID, key, authToken, si) if err != nil { + entry.initErr = err + close(entry.ready) + + n.clientsMux.Lock() + delete(n.clients, accountID) n.clientsMux.Unlock() return err } - n.clients[accountID] = entry + // Transfer any services that were registered by concurrent AddPeer calls + // while we were creating the client. + n.clientsMux.Lock() + for k, v := range entry.services { + created.services[k] = v + } + created.ready = nil + n.clients[accountID] = created n.clientsMux.Unlock() + close(entry.ready) + n.logger.WithFields(log.Fields{ "account_id": accountID, "service_key": key, @@ -200,13 +245,13 @@ func (n *NetBird) AddPeer(ctx context.Context, accountID types.AccountID, key Se // Attempt to start the client in the background; if this fails we will // retry on the first request via RoundTrip. - go n.runClientStartup(ctx, accountID, entry.client) + go n.runClientStartup(ctx, accountID, created.client) return nil } // createClientEntry generates a WireGuard keypair, authenticates with management, -// and creates an embedded NetBird client. Must be called with clientsMux held. +// and creates an embedded NetBird client. func (n *NetBird) createClientEntry(ctx context.Context, accountID types.AccountID, key ServiceKey, authToken string, si serviceInfo) (*clientEntry, error) { serviceID := si.serviceID n.logger.WithFields(log.Fields{ diff --git a/proxy/process_mappings_bench_test.go b/proxy/process_mappings_bench_test.go new file mode 100644 index 000000000..ca0792590 --- /dev/null +++ b/proxy/process_mappings_bench_test.go @@ -0,0 +1,300 @@ +package proxy + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" + + "github.com/netbirdio/netbird/proxy/internal/auth" + "github.com/netbirdio/netbird/proxy/internal/conntrack" + "github.com/netbirdio/netbird/proxy/internal/crowdsec" + proxymetrics "github.com/netbirdio/netbird/proxy/internal/metrics" + "github.com/netbirdio/netbird/proxy/internal/proxy" + "github.com/netbirdio/netbird/proxy/internal/roundtrip" + nbtcp "github.com/netbirdio/netbird/proxy/internal/tcp" + "github.com/netbirdio/netbird/proxy/internal/types" + udprelay "github.com/netbirdio/netbird/proxy/internal/udp" + "github.com/netbirdio/netbird/shared/management/proto" + + "go.opentelemetry.io/otel/metric/noop" +) + +// latencyMockClient simulates realistic gRPC latency for management calls. +type latencyMockClient struct { + proto.ProxyServiceClient + createPeerDelay time.Duration + statusUpdateDelay time.Duration +} + +func (m *latencyMockClient) SendStatusUpdate(ctx context.Context, _ *proto.SendStatusUpdateRequest, _ ...grpc.CallOption) (*proto.SendStatusUpdateResponse, error) { + if m.statusUpdateDelay > 0 { + select { + case <-time.After(m.statusUpdateDelay): + case <-ctx.Done(): + return nil, ctx.Err() + } + } + return &proto.SendStatusUpdateResponse{}, nil +} + +func (m *latencyMockClient) CreateProxyPeer(ctx context.Context, _ *proto.CreateProxyPeerRequest, _ ...grpc.CallOption) (*proto.CreateProxyPeerResponse, error) { + if m.createPeerDelay > 0 { + select { + case <-time.After(m.createPeerDelay): + case <-ctx.Done(): + return nil, ctx.Err() + } + } + return &proto.CreateProxyPeerResponse{Success: true}, nil +} + +type discardWriter struct{} + +func (discardWriter) Write(p []byte) (int, error) { return len(p), nil } + +func benchServerWithLatency(b *testing.B, createPeerDelay, statusDelay time.Duration) *Server { + b.Helper() + logger := log.New() + logger.SetLevel(log.FatalLevel) + logger.SetOutput(&discardWriter{}) + + meter, err := proxymetrics.New(context.Background(), noop.Meter{}) + if err != nil { + b.Fatal(err) + } + + mgmtClient := &latencyMockClient{ + createPeerDelay: createPeerDelay, + statusUpdateDelay: statusDelay, + } + + nb := roundtrip.NewNetBird("bench-proxy", "bench.test", + roundtrip.ClientConfig{MgmtAddr: "http://bench.test:9999"}, + logger, nil, mgmtClient) + + mainRouter := nbtcp.NewRouter(logger, func(accountID types.AccountID) (types.DialContextFunc, error) { + return (&net.Dialer{}).DialContext, nil + }, &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 443}) + + return &Server{ + Logger: logger, + mgmtClient: mgmtClient, + netbird: nb, + proxy: proxy.NewReverseProxy(nil, "auto", nil, logger), + auth: auth.NewMiddleware(logger, nil, nil), + mainRouter: mainRouter, + mainPort: 443, + meter: meter, + hijackTracker: conntrack.HijackTracker{}, + crowdsecRegistry: crowdsec.NewRegistry("", "", log.NewEntry(logger)), + crowdsecServices: make(map[types.ServiceID]bool), + lastMappings: make(map[types.ServiceID]*proto.ProxyMapping), + portRouters: make(map[uint16]*portRouter), + svcPorts: make(map[types.ServiceID][]uint16), + udpRelays: make(map[types.ServiceID]*udprelay.Relay), + } +} + +// generateHTTPMappings creates N HTTP-mode mappings with the given update type. +// All belong to a single account to share the embedded client. +func generateHTTPMappings(n int, updateType proto.ProxyMappingUpdateType) []*proto.ProxyMapping { + mappings := make([]*proto.ProxyMapping, n) + for i := range n { + mappings[i] = &proto.ProxyMapping{ + Type: updateType, + Id: fmt.Sprintf("svc-%d", i), + AccountId: "account-1", + Domain: fmt.Sprintf("svc-%d.bench.example.com", i), + Mode: "http", + Path: []*proto.PathMapping{ + { + Path: "/", + Target: fmt.Sprintf("http://10.0.%d.%d:8080", (i/256)%256, i%256), + }, + }, + Auth: &proto.Authentication{}, + } + } + return mappings +} + +// generateMultiAccountHTTPMappings creates N HTTP-mode CREATED mappings spread +// across the given number of accounts. This stresses the AddPeer new-account +// path which calls CreateProxyPeer + embed.New per unique account. +func generateMultiAccountHTTPMappings(n, accounts int) []*proto.ProxyMapping { + mappings := make([]*proto.ProxyMapping, n) + for i := range n { + mappings[i] = &proto.ProxyMapping{ + Type: proto.ProxyMappingUpdateType_UPDATE_TYPE_CREATED, + Id: fmt.Sprintf("svc-%d", i), + AccountId: fmt.Sprintf("account-%d", i%accounts), + Domain: fmt.Sprintf("svc-%d.bench.example.com", i), + Mode: "http", + Path: []*proto.PathMapping{ + { + Path: "/", + Target: fmt.Sprintf("http://10.0.%d.%d:8080", (i/256)%256, i%256), + }, + }, + Auth: &proto.Authentication{}, + } + } + return mappings +} + +// generateMixedMappings creates mappings with a realistic distribution: +// 70% HTTP create, 15% modify existing, 10% TLS on main port, 5% remove. +// All use a single account to avoid embed.New dialing. +func generateMixedMappings(n int) []*proto.ProxyMapping { + mappings := make([]*proto.ProxyMapping, n) + for i := range n { + var m *proto.ProxyMapping + switch { + case i%20 < 14: // 70% HTTP create + m = &proto.ProxyMapping{ + Type: proto.ProxyMappingUpdateType_UPDATE_TYPE_CREATED, + Id: fmt.Sprintf("svc-http-%d", i), + AccountId: "account-1", + Domain: fmt.Sprintf("svc-%d.bench.example.com", i), + Mode: "http", + Path: []*proto.PathMapping{ + {Path: "/", Target: fmt.Sprintf("http://10.0.%d.%d:8080", (i/256)%256, i%256)}, + {Path: "/api", Target: fmt.Sprintf("http://10.0.%d.%d:8081", (i/256)%256, i%256)}, + }, + Auth: &proto.Authentication{}, + } + case i%20 < 17: // 15% modify + m = &proto.ProxyMapping{ + Type: proto.ProxyMappingUpdateType_UPDATE_TYPE_MODIFIED, + Id: fmt.Sprintf("svc-http-%d", i%100), + AccountId: "account-1", + Domain: fmt.Sprintf("svc-%d.bench.example.com", i%100), + Mode: "http", + Path: []*proto.PathMapping{ + {Path: "/", Target: fmt.Sprintf("http://10.1.%d.%d:8080", (i/256)%256, i%256)}, + }, + Auth: &proto.Authentication{}, + } + case i%20 < 19: // 10% TLS passthrough on main port + m = &proto.ProxyMapping{ + Type: proto.ProxyMappingUpdateType_UPDATE_TYPE_CREATED, + Id: fmt.Sprintf("svc-tls-%d", i), + AccountId: "account-1", + Domain: fmt.Sprintf("tls-%d.bench.example.com", i), + Mode: "tls", + ListenPort: 443, + Path: []*proto.PathMapping{ + {Path: "/", Target: fmt.Sprintf("10.2.%d.%d:443", (i/256)%256, i%256)}, + }, + } + default: // 5% remove + m = &proto.ProxyMapping{ + Type: proto.ProxyMappingUpdateType_UPDATE_TYPE_REMOVED, + Id: fmt.Sprintf("svc-http-%d", i%50), + AccountId: "account-1", + Domain: fmt.Sprintf("svc-%d.bench.example.com", i%50), + Mode: "http", + } + } + mappings[i] = m + } + return mappings +} + +const ( + createPeerLatency = 100 * time.Millisecond + statusUpdateLatency = 50 * time.Millisecond +) + +// BenchmarkProcessMappings_HTTPCreate_SingleAccount benchmarks the initial sync +// scenario: N HTTP mappings all on a single account. Only the first mapping +// triggers CreateProxyPeer (100ms gRPC). The rest just register with the +// existing client. This is the "best case" production path. +func BenchmarkProcessMappings_HTTPCreate_SingleAccount(b *testing.B) { + for _, n := range []int{100, 1000, 5000} { + b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) { + mappings := generateHTTPMappings(n, proto.ProxyMappingUpdateType_UPDATE_TYPE_CREATED) + for range b.N { + s := benchServerWithLatency(b, createPeerLatency, statusUpdateLatency) + s.processMappings(b.Context(), mappings) + } + }) + } +} + +// BenchmarkProcessMappings_HTTPCreate_MultiAccount benchmarks the worst-case +// initial sync: every mapping belongs to a different account, so each one +// triggers a full CreateProxyPeer gRPC round-trip (100ms) + embed.New. +// With 500 accounts this serializes to ~50s of blocking I/O. +func BenchmarkProcessMappings_HTTPCreate_MultiAccount(b *testing.B) { + for _, tc := range []struct { + mappings int + accounts int + }{ + {100, 10}, + {100, 50}, + {1000, 50}, + {1000, 200}, + {3000, 500}, + } { + b.Run(fmt.Sprintf("mappings=%d/accounts=%d", tc.mappings, tc.accounts), func(b *testing.B) { + mappings := generateMultiAccountHTTPMappings(tc.mappings, tc.accounts) + for range b.N { + s := benchServerWithLatency(b, createPeerLatency, statusUpdateLatency) + s.processMappings(b.Context(), mappings) + } + }) + } +} + +// BenchmarkProcessMappings_Mixed benchmarks a realistic mixed workload +// of creates, modifies, TLS, and removes with production-like latency. +// TLS mappings call SendStatusUpdate (50ms each), serialized. +func BenchmarkProcessMappings_Mixed(b *testing.B) { + for _, n := range []int{100, 1000, 5000} { + b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) { + mappings := generateMixedMappings(n) + for range b.N { + s := benchServerWithLatency(b, createPeerLatency, statusUpdateLatency) + creates := generateHTTPMappings(100, proto.ProxyMappingUpdateType_UPDATE_TYPE_CREATED) + s.processMappings(b.Context(), creates) + s.processMappings(b.Context(), mappings) + } + }) + } +} + +// BenchmarkProcessMappings_ModifyOnly benchmarks bulk modification of +// already-registered mappings (no new peers needed, no gRPC). +func BenchmarkProcessMappings_ModifyOnly(b *testing.B) { + for _, n := range []int{100, 1000, 5000} { + b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) { + creates := generateHTTPMappings(n, proto.ProxyMappingUpdateType_UPDATE_TYPE_CREATED) + modifies := generateHTTPMappings(n, proto.ProxyMappingUpdateType_UPDATE_TYPE_MODIFIED) + for range b.N { + s := benchServerWithLatency(b, createPeerLatency, statusUpdateLatency) + s.processMappings(b.Context(), creates) + s.processMappings(b.Context(), modifies) + } + }) + } +} + +// BenchmarkProcessMappings_NoLatency measures pure CPU/allocation overhead +// with zero I/O latency for profiling purposes. +func BenchmarkProcessMappings_NoLatency(b *testing.B) { + for _, n := range []int{1000, 5000} { + b.Run(fmt.Sprintf("n=%d", n), func(b *testing.B) { + mappings := generateHTTPMappings(n, proto.ProxyMappingUpdateType_UPDATE_TYPE_CREATED) + for range b.N { + s := benchServerWithLatency(b, 0, 0) + s.processMappings(b.Context(), mappings) + } + }) + } +} diff --git a/proxy/server.go b/proxy/server.go index 6980e1df1..775a650bf 100644 --- a/proxy/server.go +++ b/proxy/server.go @@ -32,9 +32,11 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "golang.org/x/exp/maps" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" + grpcstatus "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" "github.com/netbirdio/netbird/proxy/internal/accesslog" @@ -938,6 +940,9 @@ func (s *Server) newManagementMappingWorker(ctx context.Context, client proto.Pr Clock: backoff.SystemClock, } + // syncSupported tracks whether management supports SyncMappings. + // Starts true; set to false on first Unimplemented error. + syncSupported := true initialSyncDone := false operation := func() error { @@ -949,36 +954,25 @@ func (s *Server) newManagementMappingWorker(ctx context.Context, client proto.Pr s.healthChecker.SetManagementConnected(false) } - supportsCrowdSec := s.crowdsecRegistry.Available() - mappingClient, err := client.GetMappingUpdate(ctx, &proto.GetMappingUpdateRequest{ - ProxyId: s.ID, - Version: s.Version, - StartedAt: timestamppb.New(s.startTime), - Address: s.ProxyURL, - Capabilities: &proto.ProxyCapabilities{ - SupportsCustomPorts: &s.SupportsCustomPorts, - RequireSubdomain: &s.RequireSubdomain, - SupportsCrowdsec: &supportsCrowdSec, - }, - }) - if err != nil { - return fmt.Errorf("create mapping stream: %w", err) + var streamErr error + if syncSupported { + streamErr = s.trySyncMappings(ctx, client, &initialSyncDone) + if isSyncUnimplemented(streamErr) { + syncSupported = false + s.Logger.Info("management does not support SyncMappings, falling back to GetMappingUpdate") + streamErr = s.tryGetMappingUpdate(ctx, client, &initialSyncDone) + } + } else { + streamErr = s.tryGetMappingUpdate(ctx, client, &initialSyncDone) } - if s.healthChecker != nil { - s.healthChecker.SetManagementConnected(true) - } - s.Logger.Debug("management mapping stream established") - - // Stream established — reset backoff so the next failure retries quickly. - bo.Reset() - - streamErr := s.handleMappingStream(ctx, mappingClient, &initialSyncDone) - if s.healthChecker != nil { s.healthChecker.SetManagementConnected(false) } + // Stream established — reset backoff so the next failure retries quickly. + bo.Reset() + if streamErr == nil { return fmt.Errorf("stream closed by server") } @@ -995,6 +989,125 @@ func (s *Server) newManagementMappingWorker(ctx context.Context, client proto.Pr } } +func (s *Server) proxyCapabilities() *proto.ProxyCapabilities { + supportsCrowdSec := s.crowdsecRegistry.Available() + return &proto.ProxyCapabilities{ + SupportsCustomPorts: &s.SupportsCustomPorts, + RequireSubdomain: &s.RequireSubdomain, + SupportsCrowdsec: &supportsCrowdSec, + } +} + +func (s *Server) tryGetMappingUpdate(ctx context.Context, client proto.ProxyServiceClient, initialSyncDone *bool) error { + mappingClient, err := client.GetMappingUpdate(ctx, &proto.GetMappingUpdateRequest{ + ProxyId: s.ID, + Version: s.Version, + StartedAt: timestamppb.New(s.startTime), + Address: s.ProxyURL, + Capabilities: s.proxyCapabilities(), + }) + if err != nil { + return fmt.Errorf("create mapping stream: %w", err) + } + + if s.healthChecker != nil { + s.healthChecker.SetManagementConnected(true) + } + s.Logger.Debug("management mapping stream established (GetMappingUpdate)") + + return s.handleMappingStream(ctx, mappingClient, initialSyncDone) +} + +func (s *Server) trySyncMappings(ctx context.Context, client proto.ProxyServiceClient, initialSyncDone *bool) error { + stream, err := client.SyncMappings(ctx) + if err != nil { + return fmt.Errorf("create sync stream: %w", err) + } + + // Send init message. + if err := stream.Send(&proto.SyncMappingsRequest{ + Msg: &proto.SyncMappingsRequest_Init{ + Init: &proto.SyncMappingsInit{ + ProxyId: s.ID, + Version: s.Version, + StartedAt: timestamppb.New(s.startTime), + Address: s.ProxyURL, + Capabilities: s.proxyCapabilities(), + }, + }, + }); err != nil { + return fmt.Errorf("send sync init: %w", err) + } + + if s.healthChecker != nil { + s.healthChecker.SetManagementConnected(true) + } + s.Logger.Debug("management mapping stream established (SyncMappings)") + + return s.handleSyncMappingsStream(ctx, stream, initialSyncDone) +} + +func isSyncUnimplemented(err error) bool { + if err == nil { + return false + } + st, ok := grpcstatus.FromError(err) + return ok && st.Code() == codes.Unimplemented +} + +func (s *Server) handleSyncMappingsStream(ctx context.Context, stream proto.ProxyService_SyncMappingsClient, initialSyncDone *bool) error { + select { + case <-s.routerReady: + case <-ctx.Done(): + return ctx.Err() + } + + var snapshotIDs map[types.ServiceID]struct{} + if !*initialSyncDone { + snapshotIDs = make(map[types.ServiceID]struct{}) + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + msg, err := stream.Recv() + switch { + case errors.Is(err, io.EOF): + return nil + case err != nil: + return fmt.Errorf("receive msg: %w", err) + } + s.Logger.Debug("Received mapping update, starting processing") + s.processMappings(ctx, msg.GetMapping()) + s.Logger.Debug("Processing mapping update completed") + + // Send ack so management knows we're ready for the next batch. + if err := stream.Send(&proto.SyncMappingsRequest{ + Msg: &proto.SyncMappingsRequest_Ack{Ack: &proto.SyncMappingsAck{}}, + }); err != nil { + return fmt.Errorf("send ack: %w", err) + } + + if !*initialSyncDone { + for _, m := range msg.GetMapping() { + snapshotIDs[types.ServiceID(m.GetId())] = struct{}{} + } + if msg.GetInitialSyncComplete() { + s.reconcileSnapshot(ctx, snapshotIDs) + snapshotIDs = nil + if s.healthChecker != nil { + s.healthChecker.SetInitialSyncComplete() + } + *initialSyncDone = true + s.Logger.Info("Initial mapping sync complete") + } + } + } + } +} + func (s *Server) handleMappingStream(ctx context.Context, mappingClient proto.ProxyService_GetMappingUpdateClient, initialSyncDone *bool) error { select { case <-s.routerReady: @@ -1067,6 +1180,8 @@ func (s *Server) reconcileSnapshot(ctx context.Context, snapshotIDs map[types.Se } func (s *Server) processMappings(ctx context.Context, mappings []*proto.ProxyMapping) { + s.ensurePeers(ctx, mappings) + for _, mapping := range mappings { s.Logger.WithFields(log.Fields{ "type": mapping.GetType(), @@ -1100,6 +1215,60 @@ func (s *Server) processMappings(ctx context.Context, mappings []*proto.ProxyMap } } +// ensurePeers pre-creates NetBird peers for all unique accounts referenced by +// CREATED mappings. Peers for different accounts are created concurrently, +// which avoids serializing N×100ms gRPC round-trips during large initial syncs. +func (s *Server) ensurePeers(ctx context.Context, mappings []*proto.ProxyMapping) { + // Collect one representative mapping per account that needs a new peer. + type peerReq struct { + accountID types.AccountID + svcKey roundtrip.ServiceKey + authToken string + svcID types.ServiceID + } + seen := make(map[types.AccountID]struct{}) + var reqs []peerReq + for _, m := range mappings { + if m.GetType() != proto.ProxyMappingUpdateType_UPDATE_TYPE_CREATED { + continue + } + accountID := types.AccountID(m.GetAccountId()) + if _, ok := seen[accountID]; ok { + continue + } + seen[accountID] = struct{}{} + if s.netbird.HasClient(accountID) { + continue + } + reqs = append(reqs, peerReq{ + accountID: accountID, + svcKey: s.serviceKeyForMapping(m), + authToken: m.GetAuthToken(), + svcID: types.ServiceID(m.GetId()), + }) + } + + if len(reqs) <= 1 { + return + } + + var wg sync.WaitGroup + wg.Add(len(reqs)) + for _, r := range reqs { + go func() { + defer wg.Done() + if err := s.netbird.AddPeer(ctx, r.accountID, r.svcKey, r.authToken, r.svcID); err != nil { + s.Logger.WithFields(log.Fields{ + "account_id": r.accountID, + "service_id": r.svcID, + "error": err, + }).Warn("failed to pre-create peer for account") + } + }() + } + wg.Wait() +} + // addMapping registers a service mapping and starts the appropriate relay or routes. func (s *Server) addMapping(ctx context.Context, mapping *proto.ProxyMapping) error { accountID := types.AccountID(mapping.GetAccountId()) diff --git a/proxy/sync_mappings_test.go b/proxy/sync_mappings_test.go new file mode 100644 index 000000000..63a1d42f2 --- /dev/null +++ b/proxy/sync_mappings_test.go @@ -0,0 +1,510 @@ +package proxy + +import ( + "context" + "errors" + "fmt" + "net" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + grpcstatus "google.golang.org/grpc/status" + + "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/service" + "github.com/netbirdio/netbird/shared/management/proto" +) + +func TestIntegration_SyncMappings_HappyPath(t *testing.T) { + setup := setupIntegrationTest(t) + defer setup.cleanup() + + conn, err := grpc.NewClient(setup.grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer conn.Close() + + client := proto.NewProxyServiceClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + stream, err := client.SyncMappings(ctx) + require.NoError(t, err) + + // Send init. + err = stream.Send(&proto.SyncMappingsRequest{ + Msg: &proto.SyncMappingsRequest_Init{ + Init: &proto.SyncMappingsInit{ + ProxyId: "sync-proxy-1", + Version: "test-v1", + Address: "test.proxy.io", + }, + }, + }) + require.NoError(t, err) + + mappingsByID := make(map[string]*proto.ProxyMapping) + for { + msg, err := stream.Recv() + require.NoError(t, err) + for _, m := range msg.GetMapping() { + mappingsByID[m.GetId()] = m + } + + // Ack every batch. + err = stream.Send(&proto.SyncMappingsRequest{ + Msg: &proto.SyncMappingsRequest_Ack{Ack: &proto.SyncMappingsAck{}}, + }) + require.NoError(t, err) + + if msg.GetInitialSyncComplete() { + break + } + } + + assert.Len(t, mappingsByID, 2, "Should receive 2 mappings") + + rp1 := mappingsByID["rp-1"] + require.NotNil(t, rp1) + assert.Equal(t, "app1.test.proxy.io", rp1.GetDomain()) + assert.Equal(t, "test-account-1", rp1.GetAccountId()) + assert.Equal(t, proto.ProxyMappingUpdateType_UPDATE_TYPE_CREATED, rp1.GetType()) + assert.NotEmpty(t, rp1.GetAuthToken(), "Should have auth token") + + rp2 := mappingsByID["rp-2"] + require.NotNil(t, rp2) + assert.Equal(t, "app2.test.proxy.io", rp2.GetDomain()) +} + +func TestIntegration_SyncMappings_BackPressure(t *testing.T) { + setup := setupIntegrationTest(t) + defer setup.cleanup() + + // Add more services so we get multiple batches. + addServicesToStore(t, setup, 20, "test.proxy.io") + + conn, err := grpc.NewClient(setup.grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer conn.Close() + + client := proto.NewProxyServiceClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + stream, err := client.SyncMappings(ctx) + require.NoError(t, err) + + err = stream.Send(&proto.SyncMappingsRequest{ + Msg: &proto.SyncMappingsRequest_Init{ + Init: &proto.SyncMappingsInit{ + ProxyId: "sync-proxy-backpressure", + Version: "test-v1", + Address: "test.proxy.io", + }, + }, + }) + require.NoError(t, err) + + // Record the ordering of events to verify back-pressure. + var mu sync.Mutex + var events []string + var totalMappings int + + for { + msg, err := stream.Recv() + require.NoError(t, err) + + mu.Lock() + events = append(events, "recv") + totalMappings += len(msg.GetMapping()) + mu.Unlock() + + // Simulate processing delay. + time.Sleep(50 * time.Millisecond) + + mu.Lock() + events = append(events, "ack") + mu.Unlock() + + err = stream.Send(&proto.SyncMappingsRequest{ + Msg: &proto.SyncMappingsRequest_Ack{Ack: &proto.SyncMappingsAck{}}, + }) + require.NoError(t, err) + + if msg.GetInitialSyncComplete() { + break + } + } + + // 2 original + 20 added = 22 services total. + assert.Equal(t, 22, totalMappings, "should receive all 22 mappings") + + // Events should alternate recv/ack — no two recvs in a row + // (management waits for ack before sending next). + mu.Lock() + defer mu.Unlock() + for i := 0; i < len(events)-1; i += 2 { + assert.Equal(t, "recv", events[i], "event %d should be recv", i) + if i+1 < len(events) { + assert.Equal(t, "ack", events[i+1], "event %d should be ack", i+1) + } + } +} + +func TestIntegration_SyncMappings_IncrementalUpdate(t *testing.T) { + setup := setupIntegrationTest(t) + defer setup.cleanup() + + conn, err := grpc.NewClient(setup.grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer conn.Close() + + client := proto.NewProxyServiceClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + stream, err := client.SyncMappings(ctx) + require.NoError(t, err) + + err = stream.Send(&proto.SyncMappingsRequest{ + Msg: &proto.SyncMappingsRequest_Init{ + Init: &proto.SyncMappingsInit{ + ProxyId: "sync-proxy-incremental", + Version: "test-v1", + Address: "test.proxy.io", + }, + }, + }) + require.NoError(t, err) + + // Drain initial snapshot. + for { + msg, err := stream.Recv() + require.NoError(t, err) + + err = stream.Send(&proto.SyncMappingsRequest{ + Msg: &proto.SyncMappingsRequest_Ack{Ack: &proto.SyncMappingsAck{}}, + }) + require.NoError(t, err) + + if msg.GetInitialSyncComplete() { + break + } + } + + // Now send an incremental update via the management server. + setup.proxyService.SendServiceUpdate(&proto.GetMappingUpdateResponse{ + Mapping: []*proto.ProxyMapping{{ + Type: proto.ProxyMappingUpdateType_UPDATE_TYPE_REMOVED, + Id: "rp-1", + AccountId: "test-account-1", + Domain: "app1.test.proxy.io", + }}, + }) + + // Receive the incremental update on the sync stream. + msg, err := stream.Recv() + require.NoError(t, err) + require.NotEmpty(t, msg.GetMapping()) + assert.Equal(t, "rp-1", msg.GetMapping()[0].GetId()) + assert.Equal(t, proto.ProxyMappingUpdateType_UPDATE_TYPE_REMOVED, msg.GetMapping()[0].GetType()) +} + +func TestIntegration_SyncMappings_MixedProxyVersions(t *testing.T) { + setup := setupIntegrationTest(t) + defer setup.cleanup() + + conn, err := grpc.NewClient(setup.grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer conn.Close() + + client := proto.NewProxyServiceClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Old proxy uses GetMappingUpdate. + legacyStream, err := client.GetMappingUpdate(ctx, &proto.GetMappingUpdateRequest{ + ProxyId: "legacy-proxy", + Version: "old-v1", + Address: "test.proxy.io", + }) + require.NoError(t, err) + + var legacyMappings []*proto.ProxyMapping + for { + msg, err := legacyStream.Recv() + require.NoError(t, err) + legacyMappings = append(legacyMappings, msg.GetMapping()...) + if msg.GetInitialSyncComplete() { + break + } + } + + // New proxy uses SyncMappings. + syncStream, err := client.SyncMappings(ctx) + require.NoError(t, err) + + err = syncStream.Send(&proto.SyncMappingsRequest{ + Msg: &proto.SyncMappingsRequest_Init{ + Init: &proto.SyncMappingsInit{ + ProxyId: "new-proxy", + Version: "new-v2", + Address: "test.proxy.io", + }, + }, + }) + require.NoError(t, err) + + var syncMappings []*proto.ProxyMapping + for { + msg, err := syncStream.Recv() + require.NoError(t, err) + syncMappings = append(syncMappings, msg.GetMapping()...) + + err = syncStream.Send(&proto.SyncMappingsRequest{ + Msg: &proto.SyncMappingsRequest_Ack{Ack: &proto.SyncMappingsAck{}}, + }) + require.NoError(t, err) + + if msg.GetInitialSyncComplete() { + break + } + } + + // Both should receive the same set of mappings. + assert.Equal(t, len(legacyMappings), len(syncMappings), + "legacy and sync proxies should receive the same number of mappings") + + legacyIDs := make(map[string]bool) + for _, m := range legacyMappings { + legacyIDs[m.GetId()] = true + } + for _, m := range syncMappings { + assert.True(t, legacyIDs[m.GetId()], + "mapping %s should be present in both streams", m.GetId()) + } + + // Both proxies should be connected. + proxies := setup.proxyService.GetConnectedProxies() + assert.Contains(t, proxies, "legacy-proxy") + assert.Contains(t, proxies, "new-proxy") + + // Both should receive incremental updates. + setup.proxyService.SendServiceUpdate(&proto.GetMappingUpdateResponse{ + Mapping: []*proto.ProxyMapping{{ + Type: proto.ProxyMappingUpdateType_UPDATE_TYPE_REMOVED, + Id: "rp-1", + AccountId: "test-account-1", + Domain: "app1.test.proxy.io", + }}, + }) + + // Legacy proxy receives via GetMappingUpdateResponse. + legacyMsg, err := legacyStream.Recv() + require.NoError(t, err) + assert.Equal(t, "rp-1", legacyMsg.GetMapping()[0].GetId()) + + // Sync proxy receives via SyncMappingsResponse. + syncMsg, err := syncStream.Recv() + require.NoError(t, err) + assert.Equal(t, "rp-1", syncMsg.GetMapping()[0].GetId()) +} + +func TestIntegration_SyncMappings_Reconnect(t *testing.T) { + setup := setupIntegrationTest(t) + defer setup.cleanup() + + conn, err := grpc.NewClient(setup.grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer conn.Close() + + client := proto.NewProxyServiceClient(conn) + proxyID := "sync-proxy-reconnect" + + receiveMappings := func() []*proto.ProxyMapping { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + stream, err := client.SyncMappings(ctx) + require.NoError(t, err) + + err = stream.Send(&proto.SyncMappingsRequest{ + Msg: &proto.SyncMappingsRequest_Init{ + Init: &proto.SyncMappingsInit{ + ProxyId: proxyID, + Version: "test-v1", + Address: "test.proxy.io", + }, + }, + }) + require.NoError(t, err) + + var mappings []*proto.ProxyMapping + for { + msg, err := stream.Recv() + require.NoError(t, err) + mappings = append(mappings, msg.GetMapping()...) + + err = stream.Send(&proto.SyncMappingsRequest{ + Msg: &proto.SyncMappingsRequest_Ack{Ack: &proto.SyncMappingsAck{}}, + }) + require.NoError(t, err) + + if msg.GetInitialSyncComplete() { + break + } + } + return mappings + } + + first := receiveMappings() + time.Sleep(100 * time.Millisecond) + second := receiveMappings() + + assert.Equal(t, len(first), len(second), + "should receive same mappings on reconnect") + + firstIDs := make(map[string]bool) + for _, m := range first { + firstIDs[m.GetId()] = true + } + for _, m := range second { + assert.True(t, firstIDs[m.GetId()], + "mapping %s should be present in both connections", m.GetId()) + } +} + +// --- Fallback tests: old management returns Unimplemented --- + +// unimplementedProxyServer embeds UnimplementedProxyServiceServer so +// SyncMappings returns codes.Unimplemented while GetMappingUpdate works. +type unimplementedSyncServer struct { + proto.UnimplementedProxyServiceServer + getMappingCalls atomic.Int32 +} + +func (s *unimplementedSyncServer) GetMappingUpdate(_ *proto.GetMappingUpdateRequest, stream proto.ProxyService_GetMappingUpdateServer) error { + s.getMappingCalls.Add(1) + return stream.Send(&proto.GetMappingUpdateResponse{ + Mapping: []*proto.ProxyMapping{{Id: "svc-1", AccountId: "acct-1", Domain: "example.com"}}, + InitialSyncComplete: true, + }) +} + +func TestIntegration_FallbackToGetMappingUpdate(t *testing.T) { + // Start a gRPC server that does NOT implement SyncMappings. + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + srv := &unimplementedSyncServer{} + grpcServer := grpc.NewServer() + proto.RegisterProxyServiceServer(grpcServer, srv) + go func() { _ = grpcServer.Serve(lis) }() + defer grpcServer.GracefulStop() + + conn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer conn.Close() + + client := proto.NewProxyServiceClient(conn) + + // Try SyncMappings — should get Unimplemented. + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + stream, err := client.SyncMappings(ctx) + require.NoError(t, err) + + err = stream.Send(&proto.SyncMappingsRequest{ + Msg: &proto.SyncMappingsRequest_Init{ + Init: &proto.SyncMappingsInit{ + ProxyId: "test-proxy", + Address: "test.example.com", + }, + }, + }) + require.NoError(t, err) + + _, err = stream.Recv() + require.Error(t, err) + st, ok := grpcstatus.FromError(err) + require.True(t, ok) + assert.Equal(t, codes.Unimplemented, st.Code(), + "unimplemented SyncMappings should return Unimplemented code") + + // isSyncUnimplemented should detect this. + assert.True(t, isSyncUnimplemented(err)) + + // The actual fallback: GetMappingUpdate should work. + legacyStream, err := client.GetMappingUpdate(ctx, &proto.GetMappingUpdateRequest{ + ProxyId: "test-proxy", + Address: "test.example.com", + }) + require.NoError(t, err) + + msg, err := legacyStream.Recv() + require.NoError(t, err) + assert.True(t, msg.GetInitialSyncComplete()) + assert.Len(t, msg.GetMapping(), 1) + assert.Equal(t, int32(1), srv.getMappingCalls.Load()) +} + +func TestIsSyncUnimplemented(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + {"nil error", nil, false}, + {"non-grpc error", errors.New("random"), false}, + {"grpc internal", grpcstatus.Error(codes.Internal, "fail"), false}, + {"grpc unavailable", grpcstatus.Error(codes.Unavailable, "fail"), false}, + {"grpc unimplemented", grpcstatus.Error(codes.Unimplemented, "method not found"), true}, + { + "wrapped unimplemented", + fmt.Errorf("create sync stream: %w", grpcstatus.Error(codes.Unimplemented, "nope")), + // grpc/status.FromError unwraps in recent versions of grpc-go. + true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, isSyncUnimplemented(tt.err)) + }) + } +} + +// addServicesToStore adds n extra services to the test store for the given cluster. +func addServicesToStore(t *testing.T, setup *integrationTestSetup, n int, cluster string) { + t.Helper() + ctx := context.Background() + for i := 0; i < n; i++ { + svc := &service.Service{ + ID: fmt.Sprintf("extra-svc-%d", i), + AccountID: "test-account-1", + Name: fmt.Sprintf("Extra Service %d", i), + Domain: fmt.Sprintf("extra-%d.test.proxy.io", i), + ProxyCluster: cluster, + Enabled: true, + Targets: []*service.Target{{ + Path: strPtr("/"), + Host: fmt.Sprintf("10.0.1.%d", i%256), + Port: 8080, + Protocol: "http", + TargetId: fmt.Sprintf("peer-extra-%d", i), + TargetType: "peer", + Enabled: true, + }}, + } + require.NoError(t, setup.store.CreateService(ctx, svc)) + } +} diff --git a/shared/management/proto/proxy_service.pb.go b/shared/management/proto/proxy_service.pb.go index 1095b6411..8de9de7d2 100644 --- a/shared/management/proto/proxy_service.pb.go +++ b/shared/management/proto/proxy_service.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.26.0 -// protoc v7.34.1 +// protoc v6.33.1 // source: proxy_service.proto package proto @@ -1970,6 +1970,269 @@ func (x *ValidateSessionResponse) GetDeniedReason() string { return "" } +// SyncMappingsRequest is sent by the proxy on the bidirectional SyncMappings +// stream. The first message MUST be an init; all subsequent messages MUST be +// acks. +type SyncMappingsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Msg: + // + // *SyncMappingsRequest_Init + // *SyncMappingsRequest_Ack + Msg isSyncMappingsRequest_Msg `protobuf_oneof:"msg"` +} + +func (x *SyncMappingsRequest) Reset() { + *x = SyncMappingsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proxy_service_proto_msgTypes[25] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SyncMappingsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SyncMappingsRequest) ProtoMessage() {} + +func (x *SyncMappingsRequest) ProtoReflect() protoreflect.Message { + mi := &file_proxy_service_proto_msgTypes[25] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SyncMappingsRequest.ProtoReflect.Descriptor instead. +func (*SyncMappingsRequest) Descriptor() ([]byte, []int) { + return file_proxy_service_proto_rawDescGZIP(), []int{25} +} + +func (m *SyncMappingsRequest) GetMsg() isSyncMappingsRequest_Msg { + if m != nil { + return m.Msg + } + return nil +} + +func (x *SyncMappingsRequest) GetInit() *SyncMappingsInit { + if x, ok := x.GetMsg().(*SyncMappingsRequest_Init); ok { + return x.Init + } + return nil +} + +func (x *SyncMappingsRequest) GetAck() *SyncMappingsAck { + if x, ok := x.GetMsg().(*SyncMappingsRequest_Ack); ok { + return x.Ack + } + return nil +} + +type isSyncMappingsRequest_Msg interface { + isSyncMappingsRequest_Msg() +} + +type SyncMappingsRequest_Init struct { + Init *SyncMappingsInit `protobuf:"bytes,1,opt,name=init,proto3,oneof"` +} + +type SyncMappingsRequest_Ack struct { + Ack *SyncMappingsAck `protobuf:"bytes,2,opt,name=ack,proto3,oneof"` +} + +func (*SyncMappingsRequest_Init) isSyncMappingsRequest_Msg() {} + +func (*SyncMappingsRequest_Ack) isSyncMappingsRequest_Msg() {} + +// SyncMappingsInit is the first message on the stream, carrying the same +// identification fields as GetMappingUpdateRequest. +type SyncMappingsInit struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ProxyId string `protobuf:"bytes,1,opt,name=proxy_id,json=proxyId,proto3" json:"proxy_id,omitempty"` + Version string `protobuf:"bytes,2,opt,name=version,proto3" json:"version,omitempty"` + StartedAt *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=started_at,json=startedAt,proto3" json:"started_at,omitempty"` + Address string `protobuf:"bytes,4,opt,name=address,proto3" json:"address,omitempty"` + Capabilities *ProxyCapabilities `protobuf:"bytes,5,opt,name=capabilities,proto3" json:"capabilities,omitempty"` +} + +func (x *SyncMappingsInit) Reset() { + *x = SyncMappingsInit{} + if protoimpl.UnsafeEnabled { + mi := &file_proxy_service_proto_msgTypes[26] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SyncMappingsInit) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SyncMappingsInit) ProtoMessage() {} + +func (x *SyncMappingsInit) ProtoReflect() protoreflect.Message { + mi := &file_proxy_service_proto_msgTypes[26] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SyncMappingsInit.ProtoReflect.Descriptor instead. +func (*SyncMappingsInit) Descriptor() ([]byte, []int) { + return file_proxy_service_proto_rawDescGZIP(), []int{26} +} + +func (x *SyncMappingsInit) GetProxyId() string { + if x != nil { + return x.ProxyId + } + return "" +} + +func (x *SyncMappingsInit) GetVersion() string { + if x != nil { + return x.Version + } + return "" +} + +func (x *SyncMappingsInit) GetStartedAt() *timestamppb.Timestamp { + if x != nil { + return x.StartedAt + } + return nil +} + +func (x *SyncMappingsInit) GetAddress() string { + if x != nil { + return x.Address + } + return "" +} + +func (x *SyncMappingsInit) GetCapabilities() *ProxyCapabilities { + if x != nil { + return x.Capabilities + } + return nil +} + +// SyncMappingsAck is sent by the proxy after it has fully processed a batch. +// Management waits for this before sending the next batch. +type SyncMappingsAck struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *SyncMappingsAck) Reset() { + *x = SyncMappingsAck{} + if protoimpl.UnsafeEnabled { + mi := &file_proxy_service_proto_msgTypes[27] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SyncMappingsAck) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SyncMappingsAck) ProtoMessage() {} + +func (x *SyncMappingsAck) ProtoReflect() protoreflect.Message { + mi := &file_proxy_service_proto_msgTypes[27] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SyncMappingsAck.ProtoReflect.Descriptor instead. +func (*SyncMappingsAck) Descriptor() ([]byte, []int) { + return file_proxy_service_proto_rawDescGZIP(), []int{27} +} + +// SyncMappingsResponse is a batch of mappings sent by management. +// Identical semantics to GetMappingUpdateResponse. +type SyncMappingsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Mapping []*ProxyMapping `protobuf:"bytes,1,rep,name=mapping,proto3" json:"mapping,omitempty"` + // initial_sync_complete is set on the last message of the initial snapshot. + InitialSyncComplete bool `protobuf:"varint,2,opt,name=initial_sync_complete,json=initialSyncComplete,proto3" json:"initial_sync_complete,omitempty"` +} + +func (x *SyncMappingsResponse) Reset() { + *x = SyncMappingsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proxy_service_proto_msgTypes[28] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SyncMappingsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SyncMappingsResponse) ProtoMessage() {} + +func (x *SyncMappingsResponse) ProtoReflect() protoreflect.Message { + mi := &file_proxy_service_proto_msgTypes[28] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SyncMappingsResponse.ProtoReflect.Descriptor instead. +func (*SyncMappingsResponse) Descriptor() ([]byte, []int) { + return file_proxy_service_proto_rawDescGZIP(), []int{28} +} + +func (x *SyncMappingsResponse) GetMapping() []*ProxyMapping { + if x != nil { + return x.Mapping + } + return nil +} + +func (x *SyncMappingsResponse) GetInitialSyncComplete() bool { + if x != nil { + return x.InitialSyncComplete + } + return false +} + var File_proxy_service_proto protoreflect.FileDescriptor var file_proxy_service_proto_rawDesc = []byte{ @@ -2254,37 +2517,74 @@ var file_proxy_service_proto_rawDesc = []byte{ 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x73, 0x65, 0x72, 0x45, 0x6d, 0x61, 0x69, 0x6c, 0x12, 0x23, 0x0a, 0x0d, 0x64, 0x65, 0x6e, 0x69, 0x65, 0x64, 0x5f, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x64, 0x65, 0x6e, 0x69, 0x65, 0x64, - 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x2a, 0x64, 0x0a, 0x16, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x4d, - 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, - 0x12, 0x17, 0x0a, 0x13, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, - 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x00, 0x12, 0x18, 0x0a, 0x14, 0x55, 0x50, 0x44, - 0x41, 0x54, 0x45, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x49, 0x46, 0x49, 0x45, - 0x44, 0x10, 0x01, 0x12, 0x17, 0x0a, 0x13, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x5f, 0x54, 0x59, - 0x50, 0x45, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x46, 0x0a, 0x0f, - 0x50, 0x61, 0x74, 0x68, 0x52, 0x65, 0x77, 0x72, 0x69, 0x74, 0x65, 0x4d, 0x6f, 0x64, 0x65, 0x12, - 0x18, 0x0a, 0x14, 0x50, 0x41, 0x54, 0x48, 0x5f, 0x52, 0x45, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, - 0x44, 0x45, 0x46, 0x41, 0x55, 0x4c, 0x54, 0x10, 0x00, 0x12, 0x19, 0x0a, 0x15, 0x50, 0x41, 0x54, - 0x48, 0x5f, 0x52, 0x45, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x50, 0x52, 0x45, 0x53, 0x45, 0x52, - 0x56, 0x45, 0x10, 0x01, 0x2a, 0xc8, 0x01, 0x0a, 0x0b, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x53, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x14, 0x50, 0x52, 0x4f, 0x58, 0x59, 0x5f, 0x53, 0x54, - 0x41, 0x54, 0x55, 0x53, 0x5f, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x00, 0x12, 0x17, - 0x0a, 0x13, 0x50, 0x52, 0x4f, 0x58, 0x59, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x41, - 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x01, 0x12, 0x23, 0x0a, 0x1f, 0x50, 0x52, 0x4f, 0x58, 0x59, - 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x54, 0x55, 0x4e, 0x4e, 0x45, 0x4c, 0x5f, 0x4e, - 0x4f, 0x54, 0x5f, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x02, 0x12, 0x24, 0x0a, 0x20, - 0x50, 0x52, 0x4f, 0x58, 0x59, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x43, 0x45, 0x52, - 0x54, 0x49, 0x46, 0x49, 0x43, 0x41, 0x54, 0x45, 0x5f, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, - 0x10, 0x03, 0x12, 0x23, 0x0a, 0x1f, 0x50, 0x52, 0x4f, 0x58, 0x59, 0x5f, 0x53, 0x54, 0x41, 0x54, - 0x55, 0x53, 0x5f, 0x43, 0x45, 0x52, 0x54, 0x49, 0x46, 0x49, 0x43, 0x41, 0x54, 0x45, 0x5f, 0x46, - 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x04, 0x12, 0x16, 0x0a, 0x12, 0x50, 0x52, 0x4f, 0x58, 0x59, - 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x05, 0x32, - 0xfc, 0x04, 0x0a, 0x0c, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x12, 0x5f, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x55, 0x70, - 0x64, 0x61, 0x74, 0x65, 0x12, 0x23, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, - 0x74, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x55, 0x70, 0x64, 0x61, - 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x6d, 0x61, 0x6e, 0x61, - 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, - 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, + 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x22, 0x81, 0x01, 0x0a, 0x13, 0x53, 0x79, 0x6e, 0x63, 0x4d, + 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x32, + 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x6d, + 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x61, + 0x70, 0x70, 0x69, 0x6e, 0x67, 0x73, 0x49, 0x6e, 0x69, 0x74, 0x48, 0x00, 0x52, 0x04, 0x69, 0x6e, + 0x69, 0x74, 0x12, 0x2f, 0x0a, 0x03, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x1b, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x53, 0x79, 0x6e, + 0x63, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x73, 0x41, 0x63, 0x6b, 0x48, 0x00, 0x52, 0x03, + 0x61, 0x63, 0x6b, 0x42, 0x05, 0x0a, 0x03, 0x6d, 0x73, 0x67, 0x22, 0xdf, 0x01, 0x0a, 0x10, 0x53, + 0x79, 0x6e, 0x63, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x73, 0x49, 0x6e, 0x69, 0x74, 0x12, + 0x19, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x07, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64, 0x5f, + 0x61, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, + 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x41, 0x0a, 0x0c, 0x63, 0x61, 0x70, + 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x1d, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x50, 0x72, 0x6f, + 0x78, 0x79, 0x43, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x52, 0x0c, + 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x22, 0x11, 0x0a, 0x0f, + 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x73, 0x41, 0x63, 0x6b, 0x22, + 0x7e, 0x0a, 0x14, 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x73, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x32, 0x0a, 0x07, 0x6d, 0x61, 0x70, 0x70, 0x69, + 0x6e, 0x67, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, + 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x4d, 0x61, 0x70, 0x70, 0x69, + 0x6e, 0x67, 0x52, 0x07, 0x6d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x12, 0x32, 0x0a, 0x15, 0x69, + 0x6e, 0x69, 0x74, 0x69, 0x61, 0x6c, 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x63, 0x6f, 0x6d, 0x70, + 0x6c, 0x65, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x13, 0x69, 0x6e, 0x69, 0x74, + 0x69, 0x61, 0x6c, 0x53, 0x79, 0x6e, 0x63, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x2a, + 0x64, 0x0a, 0x16, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x55, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x17, 0x0a, 0x13, 0x55, 0x50, 0x44, + 0x41, 0x54, 0x45, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, + 0x10, 0x00, 0x12, 0x18, 0x0a, 0x14, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x5f, 0x54, 0x59, 0x50, + 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x01, 0x12, 0x17, 0x0a, 0x13, + 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x52, 0x45, 0x4d, 0x4f, + 0x56, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x46, 0x0a, 0x0f, 0x50, 0x61, 0x74, 0x68, 0x52, 0x65, 0x77, + 0x72, 0x69, 0x74, 0x65, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x50, 0x41, 0x54, 0x48, + 0x5f, 0x52, 0x45, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x44, 0x45, 0x46, 0x41, 0x55, 0x4c, 0x54, + 0x10, 0x00, 0x12, 0x19, 0x0a, 0x15, 0x50, 0x41, 0x54, 0x48, 0x5f, 0x52, 0x45, 0x57, 0x52, 0x49, + 0x54, 0x45, 0x5f, 0x50, 0x52, 0x45, 0x53, 0x45, 0x52, 0x56, 0x45, 0x10, 0x01, 0x2a, 0xc8, 0x01, + 0x0a, 0x0b, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, + 0x14, 0x50, 0x52, 0x4f, 0x58, 0x59, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x50, 0x45, + 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x00, 0x12, 0x17, 0x0a, 0x13, 0x50, 0x52, 0x4f, 0x58, 0x59, + 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x01, + 0x12, 0x23, 0x0a, 0x1f, 0x50, 0x52, 0x4f, 0x58, 0x59, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, + 0x5f, 0x54, 0x55, 0x4e, 0x4e, 0x45, 0x4c, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x43, 0x52, 0x45, 0x41, + 0x54, 0x45, 0x44, 0x10, 0x02, 0x12, 0x24, 0x0a, 0x20, 0x50, 0x52, 0x4f, 0x58, 0x59, 0x5f, 0x53, + 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x43, 0x45, 0x52, 0x54, 0x49, 0x46, 0x49, 0x43, 0x41, 0x54, + 0x45, 0x5f, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x03, 0x12, 0x23, 0x0a, 0x1f, 0x50, + 0x52, 0x4f, 0x58, 0x59, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x43, 0x45, 0x52, 0x54, + 0x49, 0x46, 0x49, 0x43, 0x41, 0x54, 0x45, 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x04, + 0x12, 0x16, 0x0a, 0x12, 0x50, 0x52, 0x4f, 0x58, 0x59, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, + 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x05, 0x32, 0xd3, 0x05, 0x0a, 0x0c, 0x50, 0x72, 0x6f, + 0x78, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5f, 0x0a, 0x10, 0x47, 0x65, 0x74, + 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x23, 0x2e, + 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x61, + 0x70, 0x70, 0x69, 0x6e, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, + 0x47, 0x65, 0x74, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x55, 0x0a, 0x0c, 0x53, 0x79, + 0x6e, 0x63, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x73, 0x12, 0x1f, 0x2e, 0x6d, 0x61, 0x6e, + 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x61, 0x70, 0x70, + 0x69, 0x6e, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x6d, 0x61, + 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x61, 0x70, + 0x70, 0x69, 0x6e, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x54, 0x0a, 0x0d, 0x53, 0x65, 0x6e, 0x64, 0x41, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4c, 0x6f, 0x67, 0x12, 0x20, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x41, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x71, @@ -2334,7 +2634,7 @@ func file_proxy_service_proto_rawDescGZIP() []byte { } var file_proxy_service_proto_enumTypes = make([]protoimpl.EnumInfo, 3) -var file_proxy_service_proto_msgTypes = make([]protoimpl.MessageInfo, 27) +var file_proxy_service_proto_msgTypes = make([]protoimpl.MessageInfo, 31) var file_proxy_service_proto_goTypes = []interface{}{ (ProxyMappingUpdateType)(0), // 0: management.ProxyMappingUpdateType (PathRewriteMode)(0), // 1: management.PathRewriteMode @@ -2364,19 +2664,23 @@ var file_proxy_service_proto_goTypes = []interface{}{ (*GetOIDCURLResponse)(nil), // 25: management.GetOIDCURLResponse (*ValidateSessionRequest)(nil), // 26: management.ValidateSessionRequest (*ValidateSessionResponse)(nil), // 27: management.ValidateSessionResponse - nil, // 28: management.PathTargetOptions.CustomHeadersEntry - nil, // 29: management.AccessLog.MetadataEntry - (*timestamppb.Timestamp)(nil), // 30: google.protobuf.Timestamp - (*durationpb.Duration)(nil), // 31: google.protobuf.Duration + (*SyncMappingsRequest)(nil), // 28: management.SyncMappingsRequest + (*SyncMappingsInit)(nil), // 29: management.SyncMappingsInit + (*SyncMappingsAck)(nil), // 30: management.SyncMappingsAck + (*SyncMappingsResponse)(nil), // 31: management.SyncMappingsResponse + nil, // 32: management.PathTargetOptions.CustomHeadersEntry + nil, // 33: management.AccessLog.MetadataEntry + (*timestamppb.Timestamp)(nil), // 34: google.protobuf.Timestamp + (*durationpb.Duration)(nil), // 35: google.protobuf.Duration } var file_proxy_service_proto_depIdxs = []int32{ - 30, // 0: management.GetMappingUpdateRequest.started_at:type_name -> google.protobuf.Timestamp + 34, // 0: management.GetMappingUpdateRequest.started_at:type_name -> google.protobuf.Timestamp 3, // 1: management.GetMappingUpdateRequest.capabilities:type_name -> management.ProxyCapabilities 11, // 2: management.GetMappingUpdateResponse.mapping:type_name -> management.ProxyMapping - 31, // 3: management.PathTargetOptions.request_timeout:type_name -> google.protobuf.Duration + 35, // 3: management.PathTargetOptions.request_timeout:type_name -> google.protobuf.Duration 1, // 4: management.PathTargetOptions.path_rewrite:type_name -> management.PathRewriteMode - 28, // 5: management.PathTargetOptions.custom_headers:type_name -> management.PathTargetOptions.CustomHeadersEntry - 31, // 6: management.PathTargetOptions.session_idle_timeout:type_name -> google.protobuf.Duration + 32, // 5: management.PathTargetOptions.custom_headers:type_name -> management.PathTargetOptions.CustomHeadersEntry + 35, // 6: management.PathTargetOptions.session_idle_timeout:type_name -> google.protobuf.Duration 6, // 7: management.PathMapping.options:type_name -> management.PathTargetOptions 8, // 8: management.Authentication.header_auths:type_name -> management.HeaderAuth 0, // 9: management.ProxyMapping.type:type_name -> management.ProxyMappingUpdateType @@ -2384,31 +2688,38 @@ var file_proxy_service_proto_depIdxs = []int32{ 9, // 11: management.ProxyMapping.auth:type_name -> management.Authentication 10, // 12: management.ProxyMapping.access_restrictions:type_name -> management.AccessRestrictions 14, // 13: management.SendAccessLogRequest.log:type_name -> management.AccessLog - 30, // 14: management.AccessLog.timestamp:type_name -> google.protobuf.Timestamp - 29, // 15: management.AccessLog.metadata:type_name -> management.AccessLog.MetadataEntry + 34, // 14: management.AccessLog.timestamp:type_name -> google.protobuf.Timestamp + 33, // 15: management.AccessLog.metadata:type_name -> management.AccessLog.MetadataEntry 17, // 16: management.AuthenticateRequest.password:type_name -> management.PasswordRequest 18, // 17: management.AuthenticateRequest.pin:type_name -> management.PinRequest 16, // 18: management.AuthenticateRequest.header_auth:type_name -> management.HeaderAuthRequest 2, // 19: management.SendStatusUpdateRequest.status:type_name -> management.ProxyStatus - 4, // 20: management.ProxyService.GetMappingUpdate:input_type -> management.GetMappingUpdateRequest - 12, // 21: management.ProxyService.SendAccessLog:input_type -> management.SendAccessLogRequest - 15, // 22: management.ProxyService.Authenticate:input_type -> management.AuthenticateRequest - 20, // 23: management.ProxyService.SendStatusUpdate:input_type -> management.SendStatusUpdateRequest - 22, // 24: management.ProxyService.CreateProxyPeer:input_type -> management.CreateProxyPeerRequest - 24, // 25: management.ProxyService.GetOIDCURL:input_type -> management.GetOIDCURLRequest - 26, // 26: management.ProxyService.ValidateSession:input_type -> management.ValidateSessionRequest - 5, // 27: management.ProxyService.GetMappingUpdate:output_type -> management.GetMappingUpdateResponse - 13, // 28: management.ProxyService.SendAccessLog:output_type -> management.SendAccessLogResponse - 19, // 29: management.ProxyService.Authenticate:output_type -> management.AuthenticateResponse - 21, // 30: management.ProxyService.SendStatusUpdate:output_type -> management.SendStatusUpdateResponse - 23, // 31: management.ProxyService.CreateProxyPeer:output_type -> management.CreateProxyPeerResponse - 25, // 32: management.ProxyService.GetOIDCURL:output_type -> management.GetOIDCURLResponse - 27, // 33: management.ProxyService.ValidateSession:output_type -> management.ValidateSessionResponse - 27, // [27:34] is the sub-list for method output_type - 20, // [20:27] is the sub-list for method input_type - 20, // [20:20] is the sub-list for extension type_name - 20, // [20:20] is the sub-list for extension extendee - 0, // [0:20] is the sub-list for field type_name + 29, // 20: management.SyncMappingsRequest.init:type_name -> management.SyncMappingsInit + 30, // 21: management.SyncMappingsRequest.ack:type_name -> management.SyncMappingsAck + 34, // 22: management.SyncMappingsInit.started_at:type_name -> google.protobuf.Timestamp + 3, // 23: management.SyncMappingsInit.capabilities:type_name -> management.ProxyCapabilities + 11, // 24: management.SyncMappingsResponse.mapping:type_name -> management.ProxyMapping + 4, // 25: management.ProxyService.GetMappingUpdate:input_type -> management.GetMappingUpdateRequest + 28, // 26: management.ProxyService.SyncMappings:input_type -> management.SyncMappingsRequest + 12, // 27: management.ProxyService.SendAccessLog:input_type -> management.SendAccessLogRequest + 15, // 28: management.ProxyService.Authenticate:input_type -> management.AuthenticateRequest + 20, // 29: management.ProxyService.SendStatusUpdate:input_type -> management.SendStatusUpdateRequest + 22, // 30: management.ProxyService.CreateProxyPeer:input_type -> management.CreateProxyPeerRequest + 24, // 31: management.ProxyService.GetOIDCURL:input_type -> management.GetOIDCURLRequest + 26, // 32: management.ProxyService.ValidateSession:input_type -> management.ValidateSessionRequest + 5, // 33: management.ProxyService.GetMappingUpdate:output_type -> management.GetMappingUpdateResponse + 31, // 34: management.ProxyService.SyncMappings:output_type -> management.SyncMappingsResponse + 13, // 35: management.ProxyService.SendAccessLog:output_type -> management.SendAccessLogResponse + 19, // 36: management.ProxyService.Authenticate:output_type -> management.AuthenticateResponse + 21, // 37: management.ProxyService.SendStatusUpdate:output_type -> management.SendStatusUpdateResponse + 23, // 38: management.ProxyService.CreateProxyPeer:output_type -> management.CreateProxyPeerResponse + 25, // 39: management.ProxyService.GetOIDCURL:output_type -> management.GetOIDCURLResponse + 27, // 40: management.ProxyService.ValidateSession:output_type -> management.ValidateSessionResponse + 33, // [33:41] is the sub-list for method output_type + 25, // [25:33] is the sub-list for method input_type + 25, // [25:25] is the sub-list for extension type_name + 25, // [25:25] is the sub-list for extension extendee + 0, // [0:25] is the sub-list for field type_name } func init() { file_proxy_service_proto_init() } @@ -2717,6 +3028,54 @@ func file_proxy_service_proto_init() { return nil } } + file_proxy_service_proto_msgTypes[25].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SyncMappingsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proxy_service_proto_msgTypes[26].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SyncMappingsInit); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proxy_service_proto_msgTypes[27].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SyncMappingsAck); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proxy_service_proto_msgTypes[28].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SyncMappingsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_proxy_service_proto_msgTypes[0].OneofWrappers = []interface{}{} file_proxy_service_proto_msgTypes[12].OneofWrappers = []interface{}{ @@ -2726,13 +3085,17 @@ func file_proxy_service_proto_init() { } file_proxy_service_proto_msgTypes[17].OneofWrappers = []interface{}{} file_proxy_service_proto_msgTypes[20].OneofWrappers = []interface{}{} + file_proxy_service_proto_msgTypes[25].OneofWrappers = []interface{}{ + (*SyncMappingsRequest_Init)(nil), + (*SyncMappingsRequest_Ack)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_proxy_service_proto_rawDesc, NumEnums: 3, - NumMessages: 27, + NumMessages: 31, NumExtensions: 0, NumServices: 1, }, diff --git a/shared/management/proto/proxy_service.proto b/shared/management/proto/proxy_service.proto index e359f0cbd..d1171b27e 100644 --- a/shared/management/proto/proxy_service.proto +++ b/shared/management/proto/proxy_service.proto @@ -12,6 +12,15 @@ import "google/protobuf/timestamp.proto"; service ProxyService { rpc GetMappingUpdate(GetMappingUpdateRequest) returns (stream GetMappingUpdateResponse); + // SyncMappings is a bidirectional stream that replaces GetMappingUpdate for + // new proxies. The proxy sends an initial SyncMappingsRequest to start the + // stream and then sends an ack after each batch is fully processed. + // Management waits for the ack before sending the next batch, providing + // application-level back-pressure during large initial syncs. + // Old proxies continue using GetMappingUpdate; old management servers + // return Unimplemented for this RPC and proxies fall back. + rpc SyncMappings(stream SyncMappingsRequest) returns (stream SyncMappingsResponse); + rpc SendAccessLog(SendAccessLogRequest) returns (SendAccessLogResponse); rpc Authenticate(AuthenticateRequest) returns (AuthenticateResponse); @@ -246,3 +255,35 @@ message ValidateSessionResponse { string user_email = 3; string denied_reason = 4; } + +// SyncMappingsRequest is sent by the proxy on the bidirectional SyncMappings +// stream. The first message MUST be an init; all subsequent messages MUST be +// acks. +message SyncMappingsRequest { + oneof msg { + SyncMappingsInit init = 1; + SyncMappingsAck ack = 2; + } +} + +// SyncMappingsInit is the first message on the stream, carrying the same +// identification fields as GetMappingUpdateRequest. +message SyncMappingsInit { + string proxy_id = 1; + string version = 2; + google.protobuf.Timestamp started_at = 3; + string address = 4; + ProxyCapabilities capabilities = 5; +} + +// SyncMappingsAck is sent by the proxy after it has fully processed a batch. +// Management waits for this before sending the next batch. +message SyncMappingsAck {} + +// SyncMappingsResponse is a batch of mappings sent by management. +// Identical semantics to GetMappingUpdateResponse. +message SyncMappingsResponse { + repeated ProxyMapping mapping = 1; + // initial_sync_complete is set on the last message of the initial snapshot. + bool initial_sync_complete = 2; +} diff --git a/shared/management/proto/proxy_service_grpc.pb.go b/shared/management/proto/proxy_service_grpc.pb.go index 627b217d8..fdc031ed7 100644 --- a/shared/management/proto/proxy_service_grpc.pb.go +++ b/shared/management/proto/proxy_service_grpc.pb.go @@ -19,6 +19,14 @@ const _ = grpc.SupportPackageIsVersion7 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type ProxyServiceClient interface { GetMappingUpdate(ctx context.Context, in *GetMappingUpdateRequest, opts ...grpc.CallOption) (ProxyService_GetMappingUpdateClient, error) + // SyncMappings is a bidirectional stream that replaces GetMappingUpdate for + // new proxies. The proxy sends an initial SyncMappingsRequest to start the + // stream and then sends an ack after each batch is fully processed. + // Management waits for the ack before sending the next batch, providing + // application-level back-pressure during large initial syncs. + // Old proxies continue using GetMappingUpdate; old management servers + // return Unimplemented for this RPC and proxies fall back. + SyncMappings(ctx context.Context, opts ...grpc.CallOption) (ProxyService_SyncMappingsClient, error) SendAccessLog(ctx context.Context, in *SendAccessLogRequest, opts ...grpc.CallOption) (*SendAccessLogResponse, error) Authenticate(ctx context.Context, in *AuthenticateRequest, opts ...grpc.CallOption) (*AuthenticateResponse, error) SendStatusUpdate(ctx context.Context, in *SendStatusUpdateRequest, opts ...grpc.CallOption) (*SendStatusUpdateResponse, error) @@ -69,6 +77,37 @@ func (x *proxyServiceGetMappingUpdateClient) Recv() (*GetMappingUpdateResponse, return m, nil } +func (c *proxyServiceClient) SyncMappings(ctx context.Context, opts ...grpc.CallOption) (ProxyService_SyncMappingsClient, error) { + stream, err := c.cc.NewStream(ctx, &ProxyService_ServiceDesc.Streams[1], "/management.ProxyService/SyncMappings", opts...) + if err != nil { + return nil, err + } + x := &proxyServiceSyncMappingsClient{stream} + return x, nil +} + +type ProxyService_SyncMappingsClient interface { + Send(*SyncMappingsRequest) error + Recv() (*SyncMappingsResponse, error) + grpc.ClientStream +} + +type proxyServiceSyncMappingsClient struct { + grpc.ClientStream +} + +func (x *proxyServiceSyncMappingsClient) Send(m *SyncMappingsRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *proxyServiceSyncMappingsClient) Recv() (*SyncMappingsResponse, error) { + m := new(SyncMappingsResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func (c *proxyServiceClient) SendAccessLog(ctx context.Context, in *SendAccessLogRequest, opts ...grpc.CallOption) (*SendAccessLogResponse, error) { out := new(SendAccessLogResponse) err := c.cc.Invoke(ctx, "/management.ProxyService/SendAccessLog", in, out, opts...) @@ -128,6 +167,14 @@ func (c *proxyServiceClient) ValidateSession(ctx context.Context, in *ValidateSe // for forward compatibility type ProxyServiceServer interface { GetMappingUpdate(*GetMappingUpdateRequest, ProxyService_GetMappingUpdateServer) error + // SyncMappings is a bidirectional stream that replaces GetMappingUpdate for + // new proxies. The proxy sends an initial SyncMappingsRequest to start the + // stream and then sends an ack after each batch is fully processed. + // Management waits for the ack before sending the next batch, providing + // application-level back-pressure during large initial syncs. + // Old proxies continue using GetMappingUpdate; old management servers + // return Unimplemented for this RPC and proxies fall back. + SyncMappings(ProxyService_SyncMappingsServer) error SendAccessLog(context.Context, *SendAccessLogRequest) (*SendAccessLogResponse, error) Authenticate(context.Context, *AuthenticateRequest) (*AuthenticateResponse, error) SendStatusUpdate(context.Context, *SendStatusUpdateRequest) (*SendStatusUpdateResponse, error) @@ -146,6 +193,9 @@ type UnimplementedProxyServiceServer struct { func (UnimplementedProxyServiceServer) GetMappingUpdate(*GetMappingUpdateRequest, ProxyService_GetMappingUpdateServer) error { return status.Errorf(codes.Unimplemented, "method GetMappingUpdate not implemented") } +func (UnimplementedProxyServiceServer) SyncMappings(ProxyService_SyncMappingsServer) error { + return status.Errorf(codes.Unimplemented, "method SyncMappings not implemented") +} func (UnimplementedProxyServiceServer) SendAccessLog(context.Context, *SendAccessLogRequest) (*SendAccessLogResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SendAccessLog not implemented") } @@ -198,6 +248,32 @@ func (x *proxyServiceGetMappingUpdateServer) Send(m *GetMappingUpdateResponse) e return x.ServerStream.SendMsg(m) } +func _ProxyService_SyncMappings_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(ProxyServiceServer).SyncMappings(&proxyServiceSyncMappingsServer{stream}) +} + +type ProxyService_SyncMappingsServer interface { + Send(*SyncMappingsResponse) error + Recv() (*SyncMappingsRequest, error) + grpc.ServerStream +} + +type proxyServiceSyncMappingsServer struct { + grpc.ServerStream +} + +func (x *proxyServiceSyncMappingsServer) Send(m *SyncMappingsResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *proxyServiceSyncMappingsServer) Recv() (*SyncMappingsRequest, error) { + m := new(SyncMappingsRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func _ProxyService_SendAccessLog_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(SendAccessLogRequest) if err := dec(in); err != nil { @@ -344,6 +420,12 @@ var ProxyService_ServiceDesc = grpc.ServiceDesc{ Handler: _ProxyService_GetMappingUpdate_Handler, ServerStreams: true, }, + { + StreamName: "SyncMappings", + Handler: _ProxyService_SyncMappings_Handler, + ServerStreams: true, + ClientStreams: true, + }, }, Metadata: "proxy_service.proto", }