fix(proxy): harden inbound listener resource + startup-ctx handling

Three correctness fixes on the per-account inbound path, with tests:

- Close the logrus ErrorLog PipeWriter on tearDown. WriterLevel hands
  back an *io.PipeWriter backed by a pipe + scanner goroutine that the
  caller owns; the two writers per account (https + plain) were never
  closed, leaking the pipe and goroutine on every teardown.
- Run the post-Start hooks on context.Background(). runClientStartup
  is launched in a goroutine from AddPeer and was inheriting the
  caller's request-scoped ctx, so a cancelled request could abort the
  inbound bring-up or fail the management status notification. The
  tail is split into notifyClientReady so the contract is testable.

Tests cover the PipeWriter close behaviour and assert the readyHandler
+ NotifyStatus calls receive a non-cancelled background context.
This commit is contained in:
mlsmaycon
2026-05-26 21:58:52 +02:00
parent e09d51c1a8
commit 878344088f
4 changed files with 156 additions and 29 deletions

View File

@@ -5,6 +5,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
stdlog "log"
"net"
"net/http"
@@ -55,15 +56,18 @@ type inboundManager struct {
}
// inboundEntry owns the listeners, router and HTTP servers for a single
// account's embedded netstack.
// account's embedded netstack. errorLogWriters retain the logrus pipe
// writers backing each http.Server's ErrorLog so tearDown can close
// them — otherwise the pipe + its scanner goroutine leak per account.
type inboundEntry struct {
router *nbtcp.Router
tlsListener net.Listener
plainListener net.Listener
httpsServer *http.Server
httpServer *http.Server
cancel context.CancelFunc
wg sync.WaitGroup
router *nbtcp.Router
tlsListener net.Listener
plainListener net.Listener
httpsServer *http.Server
httpServer *http.Server
errorLogWriters []*io.PipeWriter
cancel context.CancelFunc
wg sync.WaitGroup
}
// pendingInboundRoute holds a route that arrived before the account's
@@ -147,30 +151,34 @@ func (m *inboundManager) bringUp(ctx context.Context, accountID types.AccountID,
return types.WithOverlayOrigin(ctx)
}
httpsErrLog, httpsErrW := newInboundErrorLog(m.logger, "https", accountID)
httpErrLog, httpErrW := newInboundErrorLog(m.logger, "http", accountID)
httpsServer := &http.Server{
Handler: scopedHandler,
TLSConfig: m.tlsConfig,
ReadHeaderTimeout: httpInboundReadHeaderTimeout,
IdleTimeout: httpInboundIdleTimeout,
ErrorLog: newInboundErrorLog(m.logger, "https", accountID),
ErrorLog: httpsErrLog,
ConnContext: markOverlayOrigin,
}
httpServer := &http.Server{
Handler: scopedHandler,
ReadHeaderTimeout: httpInboundReadHeaderTimeout,
IdleTimeout: httpInboundIdleTimeout,
ErrorLog: newInboundErrorLog(m.logger, "http", accountID),
ErrorLog: httpErrLog,
ConnContext: markOverlayOrigin,
}
runCtx, cancel := context.WithCancel(ctx)
entry := &inboundEntry{
router: router,
tlsListener: tlsListener,
plainListener: plainListener,
httpsServer: httpsServer,
httpServer: httpServer,
cancel: cancel,
router: router,
tlsListener: tlsListener,
plainListener: plainListener,
httpsServer: httpsServer,
httpServer: httpServer,
errorLogWriters: []*io.PipeWriter{httpsErrW, httpErrW},
cancel: cancel,
}
entry.wg.Add(1)
@@ -237,6 +245,14 @@ func (m *inboundManager) tearDown(accountID types.AccountID, entry *inboundEntry
m.logger.Debugf("close per-account plain listener: %v", err)
}
entry.wg.Wait()
// Close the ErrorLog pipes only after the http.Servers have fully
// stopped so any straggling stdlib write doesn't race with the
// close. Each writer also tears down the logrus scanner goroutine.
for _, w := range entry.errorLogWriters {
if err := w.Close(); err != nil {
m.logger.Debugf("close per-account inbound error log writer: %v", err)
}
}
}
// AddRoute records an SNI/host route on the account's per-account router.
@@ -538,10 +554,14 @@ func (a inboundDebugAdapter) InboundListeners() map[types.AccountID]debug.Inboun
}
// newInboundErrorLog routes a per-account http.Server's stdlib error
// stream through logrus at warn level.
func newInboundErrorLog(logger *log.Logger, scheme string, accountID types.AccountID) *stdlog.Logger {
return stdlog.New(logger.WithFields(log.Fields{
// stream through logrus at warn level. The returned PipeWriter must be
// closed by the caller (tearDown) once the http.Server has shut down —
// otherwise the pipe and its scanner goroutine leak per account, see
// logrus.Entry.WriterLevel.
func newInboundErrorLog(logger *log.Logger, scheme string, accountID types.AccountID) (*stdlog.Logger, *io.PipeWriter) {
w := logger.WithFields(log.Fields{
"inbound-http": scheme,
"account_id": accountID,
}).WriterLevel(log.WarnLevel), "", 0)
}).WriterLevel(log.WarnLevel)
return stdlog.New(w, "", 0), w
}

View File

@@ -4,6 +4,7 @@ import (
"bufio"
"context"
"crypto/tls"
"io"
"net"
"net/http"
"net/http/httptest"
@@ -482,6 +483,38 @@ func selfSignedTLSConfig(t *testing.T) *tls.Config {
return &tls.Config{Certificates: []tls.Certificate{cert}, MinVersion: tls.VersionTLS12} //nolint:gosec
}
// TestNewInboundErrorLog_WriterIsCloseable guards the close path on the
// logrus PipeWriter that backs each per-account http.Server's ErrorLog.
// logrus.Entry.WriterLevel returns an *io.PipeWriter that owns a pipe +
// scanner goroutine; the caller must Close() it on teardown or the
// resources leak per account. The contract is verified two ways:
//
// - the constructor returns a non-nil writer the caller can keep,
// - writing to the writer after Close() fails with io.ErrClosedPipe,
// which is the only externally observable sign that Close was wired.
//
// A leaking refactor (forgetting to thread the writer to tearDown, or
// dropping the Close call) would still pass this test individually but
// fail an integration goleak check; this unit test is the cheap first
// line of defence.
func TestNewInboundErrorLog_WriterIsCloseable(t *testing.T) {
logger := quietLogger()
stdLog, writer := newInboundErrorLog(logger, "https", types.AccountID("acct-1"))
require.NotNil(t, stdLog, "newInboundErrorLog must return a non-nil *log.Logger")
require.NotNil(t, writer, "newInboundErrorLog must return the underlying PipeWriter so tearDown can Close it")
// First Close succeeds.
require.NoError(t, writer.Close(), "PipeWriter.Close should succeed the first time")
// After Close, the writer must refuse new writes — that's the only
// behavioural signal that the pipe (and its scanner goroutine) has
// shut down.
_, err := writer.Write([]byte("post-close write\n"))
require.ErrorIs(t, err, io.ErrClosedPipe,
"writes after Close must surface io.ErrClosedPipe so callers know the writer is gone")
}
// testCertPEM / testKeyPEM are a minimal RSA self-signed cert for
// 127.0.0.1 — only used by tests that need a working TLS handshake.
var testCertPEM = []byte(`-----BEGIN CERTIFICATE-----

View File

@@ -242,8 +242,10 @@ func (n *NetBird) AddPeer(ctx context.Context, accountID types.AccountID, key Se
}).Info("created new client for account")
// Attempt to start the client in the background; if this fails we will
// retry on the first request via RoundTrip.
go n.runClientStartup(ctx, accountID, entry.client)
// retry on the first request via RoundTrip. runClientStartup uses its
// own background context so the caller's request-scoped ctx can't
// cancel the inbound bring-up.
go n.runClientStartup(accountID, entry.client)
return nil
}
@@ -355,8 +357,14 @@ func (n *NetBird) createClientEntry(ctx context.Context, accountID types.Account
}, nil
}
// runClientStartup starts the client and notifies registered services on success.
func (n *NetBird) runClientStartup(ctx context.Context, accountID types.AccountID, client *embed.Client) {
// runClientStartup starts the client and notifies registered services on
// success. This function runs in a goroutine launched from AddPeer, so it
// must never inherit the caller's request-scoped context — a canceled
// request must not abort the inbound listener bring-up or the management
// status notification. The embedded client.Start gets its own bounded
// startCtx; once Start succeeds, notifyClientReady takes over with a
// fresh context.Background() (see that function for the contract).
func (n *NetBird) runClientStartup(accountID types.AccountID, client *embed.Client) {
startCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
@@ -369,7 +377,17 @@ func (n *NetBird) runClientStartup(ctx context.Context, accountID types.AccountI
return
}
// Mark client as started and collect services to notify outside the lock.
n.notifyClientReady(accountID, client)
}
// notifyClientReady marks the account's client as started, fires the
// readyHandler hook, and notifies management of the new tunnel
// connection for every registered service. It is split out of
// runClientStartup so a regression test can drive the post-Start tail
// without needing a live embedded client. The contract that the
// hooks/notifier see context.Background() — never the AddPeer caller's
// ctx — lives here.
func (n *NetBird) notifyClientReady(accountID types.AccountID, client *embed.Client) {
n.clientsMux.Lock()
entry, exists := n.clients[accountID]
if exists {
@@ -384,8 +402,10 @@ func (n *NetBird) runClientStartup(ctx context.Context, accountID types.AccountI
readyHandler := n.readyHandler
n.clientsMux.Unlock()
bgCtx := context.Background()
if readyHandler != nil {
state := readyHandler(ctx, accountID, client)
state := readyHandler(bgCtx, accountID, client)
n.clientsMux.Lock()
if e, ok := n.clients[accountID]; ok {
e.inbound = state
@@ -404,7 +424,7 @@ func (n *NetBird) runClientStartup(ctx context.Context, accountID types.AccountI
return
}
for _, sn := range toNotify {
if err := n.statusNotifier.NotifyStatus(ctx, accountID, sn.serviceID, true); err != nil {
if err := n.statusNotifier.NotifyStatus(bgCtx, accountID, sn.serviceID, true); err != nil {
n.logger.WithFields(log.Fields{
"account_id": accountID,
"service_key": sn.key,

View File

@@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"github.com/netbirdio/netbird/client/embed"
"github.com/netbirdio/netbird/proxy/internal/types"
"github.com/netbirdio/netbird/shared/management/proto"
)
@@ -30,12 +31,15 @@ type statusCall struct {
accountID types.AccountID
serviceID types.ServiceID
connected bool
// ctx is captured so tests can assert the notifier received a
// fresh background context rather than an inherited request ctx.
ctx context.Context
}
func (m *mockStatusNotifier) NotifyStatus(_ context.Context, accountID types.AccountID, serviceID types.ServiceID, connected bool) error {
func (m *mockStatusNotifier) NotifyStatus(ctx context.Context, accountID types.AccountID, serviceID types.ServiceID, connected bool) error {
m.mu.Lock()
defer m.mu.Unlock()
m.statuses = append(m.statuses, statusCall{accountID, serviceID, connected})
m.statuses = append(m.statuses, statusCall{accountID, serviceID, connected, ctx})
return nil
}
@@ -360,3 +364,53 @@ func TestNetBird_RemovePeer_NotifiesDisconnection(t *testing.T) {
assert.Equal(t, types.ServiceID("svc-1"), calls[0].serviceID)
assert.False(t, calls[0].connected)
}
// TestNotifyClientReady_UsesBackgroundCtx pins the contract that the
// post-Start hooks (readyHandler + statusNotifier.NotifyStatus) run on
// a fresh context.Background() rather than inheriting the AddPeer
// caller's request- or stream-scoped ctx. Without this, a cancelled
// caller ctx could abort the inbound listener bring-up or cause the
// management status notification to fail spuriously and leave the
// account in a half-connected state.
func TestNotifyClientReady_UsesBackgroundCtx(t *testing.T) {
notifier := &mockStatusNotifier{}
nb := NewNetBird("test-proxy", "invalid.test", ClientConfig{
MgmtAddr: "http://invalid.test:9999",
}, nil, notifier, &mockMgmtClient{})
accountID := types.AccountID("acct-async")
// Pre-populate a client entry so notifyClientReady has something
// to mark started + something to enumerate for NotifyStatus.
nb.clientsMux.Lock()
nb.clients[accountID] = &clientEntry{
services: map[ServiceKey]serviceInfo{
DomainServiceKey("svc.example"): {serviceID: types.ServiceID("svc-1")},
},
}
nb.clientsMux.Unlock()
var capturedReadyCtx context.Context
nb.SetClientLifecycle(
func(ctx context.Context, _ types.AccountID, _ *embed.Client) any {
capturedReadyCtx = ctx
return nil
},
nil,
)
// Drive the post-Start path directly; a real client.Start would
// need a working management URL.
nb.notifyClientReady(accountID, nil)
require.NotNil(t, capturedReadyCtx, "readyHandler must have been invoked")
require.NoError(t, capturedReadyCtx.Err(),
"readyHandler must receive a background context, not an inherited cancelled one")
deadline, ok := capturedReadyCtx.Deadline()
assert.False(t, ok, "readyHandler ctx must have no deadline (background); got %v", deadline)
calls := notifier.calls()
require.Len(t, calls, 1, "NotifyStatus must be invoked once per registered service")
require.NotNil(t, calls[0].ctx, "NotifyStatus must receive a context")
require.NoError(t, calls[0].ctx.Err(),
"NotifyStatus must receive a background context, not an inherited cancelled one")
}