mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-28 11:39:55 +00:00
Compare commits
2 Commits
follow-up-
...
feature/io
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7ea0882975 | ||
|
|
944a258459 |
39
.github/workflows/proto-version-check.yml
vendored
39
.github/workflows/proto-version-check.yml
vendored
@@ -20,30 +20,15 @@ jobs:
|
||||
per_page: 100,
|
||||
});
|
||||
|
||||
// Cover renamed .pb.go files in addition to plain edits.
|
||||
// Renamed entries land under the new path with previous_filename
|
||||
// pointing at the base-side name, so we read the base content
|
||||
// from the old path when present.
|
||||
const changedPbFiles = files
|
||||
.filter(f => (f.status === 'modified' || f.status === 'renamed')
|
||||
&& f.filename.endsWith('.pb.go'))
|
||||
.map(f => ({
|
||||
headPath: f.filename,
|
||||
basePath: f.previous_filename || f.filename,
|
||||
}));
|
||||
if (changedPbFiles.length === 0) {
|
||||
console.log('No modified or renamed .pb.go files to check');
|
||||
const modifiedPbFiles = files.filter(
|
||||
f => f.filename.endsWith('.pb.go') && f.status === 'modified'
|
||||
);
|
||||
if (modifiedPbFiles.length === 0) {
|
||||
console.log('No modified .pb.go files to check');
|
||||
return;
|
||||
}
|
||||
|
||||
// Matches the generator version headers protoc writes at the top
|
||||
// of generated files:
|
||||
// // protoc v3.21.12
|
||||
// // protoc-gen-go v1.26.0
|
||||
// // - protoc-gen-go-grpc v1.6.1 (grpc files prefix with "- ")
|
||||
// The optional "- " prefix and the optional -gen-go / -gen-go-grpc
|
||||
// suffixes keep the *_grpc.pb.go headers in scope.
|
||||
const versionPattern = /^\s*\/\/\s+(?:-\s+)?protoc(?:-gen-go(?:-grpc)?)?\s+v[\d.]+/;
|
||||
const versionPattern = /^\s*\/\/\s+protoc(?:-gen-go)?\s+v[\d.]+/;
|
||||
const baseSha = context.payload.pull_request.base.sha;
|
||||
const headSha = context.payload.pull_request.head.sha;
|
||||
|
||||
@@ -70,22 +55,20 @@ jobs:
|
||||
}
|
||||
|
||||
const violations = [];
|
||||
for (const file of changedPbFiles) {
|
||||
for (const file of modifiedPbFiles) {
|
||||
const [base, head] = await Promise.all([
|
||||
getVersionHeader(file.basePath, baseSha),
|
||||
getVersionHeader(file.headPath, headSha),
|
||||
getVersionHeader(file.filename, baseSha),
|
||||
getVersionHeader(file.filename, headSha),
|
||||
]);
|
||||
if (!base.ok || !head.ok) {
|
||||
core.warning(
|
||||
`Skipping ${file.headPath}: base=${base.ok ? 'ok' : base.reason}, head=${head.ok ? 'ok' : head.reason}`
|
||||
`Skipping ${file.filename}: base=${base.ok ? 'ok' : base.reason}, head=${head.ok ? 'ok' : head.reason}`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
if (base.lines.join('\n') !== head.lines.join('\n')) {
|
||||
violations.push({
|
||||
file: file.basePath === file.headPath
|
||||
? file.headPath
|
||||
: `${file.basePath} → ${file.headPath}`,
|
||||
file: file.filename,
|
||||
base: base.lines,
|
||||
head: head.lines,
|
||||
});
|
||||
|
||||
@@ -310,12 +310,8 @@ func (d *Status) PeerByIP(ip string) (string, bool) {
|
||||
|
||||
// PeerStateByIP returns the full peer State for the given tunnel IP.
|
||||
// Matches against either the IPv4 (State.IP) or IPv6 (State.IPv6) tunnel
|
||||
// address so dual-stack peers are reachable on either family. Searches
|
||||
// both d.peers and d.offlinePeers — peers that have been moved into
|
||||
// the offline slice by ReplaceOfflinePeers are still part of the
|
||||
// account's roster and callers (DNS filter, embed.Client.IdentityForIP)
|
||||
// need to recognise them rather than treating them as unknown. Returns
|
||||
// the zero State and false when no peer matches or the input is empty.
|
||||
// address so dual-stack peers are reachable on either family. Returns the
|
||||
// zero State and false when no peer matches or the input is empty.
|
||||
func (d *Status) PeerStateByIP(ip string) (State, bool) {
|
||||
if ip == "" {
|
||||
return State{}, false
|
||||
@@ -328,11 +324,6 @@ func (d *Status) PeerStateByIP(ip string) (State, bool) {
|
||||
return state, true
|
||||
}
|
||||
}
|
||||
for _, state := range d.offlinePeers {
|
||||
if (state.IP != "" && state.IP == ip) || (state.IPv6 != "" && state.IPv6 == ip) {
|
||||
return state, true
|
||||
}
|
||||
}
|
||||
return State{}, false
|
||||
}
|
||||
|
||||
|
||||
@@ -90,28 +90,6 @@ func TestStatus_PeerStateByIP_MatchesIPv6(t *testing.T) {
|
||||
req.Equal("pk-1", state.PubKey, "matching state must carry the right pub key")
|
||||
}
|
||||
|
||||
// TestStatus_PeerStateByIP_MatchesOfflinePeers covers peers that have
|
||||
// been moved into the offline slice via ReplaceOfflinePeers. Callers
|
||||
// (DNS filter, embed.Client.IdentityForIP) need to treat them as known
|
||||
// rather than unknown — otherwise authentication / DNS filtering treats
|
||||
// known-but-offline peers as foreign IPs.
|
||||
func TestStatus_PeerStateByIP_MatchesOfflinePeers(t *testing.T) {
|
||||
status := NewRecorder("https://mgm")
|
||||
req := require.New(t)
|
||||
|
||||
status.ReplaceOfflinePeers([]State{
|
||||
{PubKey: "pk-offline", FQDN: "offline.netbird", IP: "100.64.0.20", IPv6: "fd00::20"},
|
||||
})
|
||||
|
||||
state, ok := status.PeerStateByIP("100.64.0.20")
|
||||
req.True(ok, "offline peer must resolve by IPv4 tunnel address")
|
||||
req.Equal("pk-offline", state.PubKey, "matching state must carry the offline peer's pub key")
|
||||
|
||||
state, ok = status.PeerStateByIP("fd00::20")
|
||||
req.True(ok, "offline peer must resolve by IPv6 tunnel address")
|
||||
req.Equal("pk-offline", state.PubKey, "IPv6 match must carry the offline peer's pub key")
|
||||
}
|
||||
|
||||
func TestStatus_UpdatePeerFQDN(t *testing.T) {
|
||||
key := "abc"
|
||||
fqdn := "peer-a.netbird.local"
|
||||
|
||||
@@ -76,6 +76,9 @@ type Client struct {
|
||||
dnsManager dns.IosDnsManager
|
||||
loginComplete bool
|
||||
connectClient *internal.ConnectClient
|
||||
// config holds the active configuration once Run has loaded it. Consumed by
|
||||
// the in-app SSH client for the NetBird SSH key and the OAuth flow.
|
||||
config *profilemanager.Config
|
||||
// preloadedConfig holds config loaded from JSON (used on tvOS where file writes are blocked)
|
||||
preloadedConfig *profilemanager.Config
|
||||
}
|
||||
@@ -160,6 +163,7 @@ func (c *Client) Run(fd int32, interfaceName string, envList *EnvList) error {
|
||||
ctx = internal.CtxInitState(ctx)
|
||||
c.onHostDnsFn = func([]string) {}
|
||||
cfg.WgIface = interfaceName
|
||||
c.config = cfg
|
||||
|
||||
c.connectClient = internal.NewConnectClient(ctx, cfg, c.recorder)
|
||||
return c.connectClient.RunOniOS(fd, c.networkChangeListener, c.dnsManager, c.stateFile)
|
||||
@@ -527,6 +531,13 @@ func (c *Client) DeselectRoute(id string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// sshState returns the active config and the running connect client for the
|
||||
// in-app SSH client. Both are nil until Run has loaded the config and started
|
||||
// the tunnel.
|
||||
func (c *Client) sshState() (*profilemanager.Config, *internal.ConnectClient) {
|
||||
return c.config, c.connectClient
|
||||
}
|
||||
|
||||
func formatDuration(d time.Duration) string {
|
||||
ds := d.String()
|
||||
dotIndex := strings.Index(ds, ".")
|
||||
|
||||
431
client/ios/NetBirdSDK/ssh_client.go
Normal file
431
client/ios/NetBirdSDK/ssh_client.go
Normal file
@@ -0,0 +1,431 @@
|
||||
//go:build ios
|
||||
|
||||
package NetBirdSDK
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
gossh "golang.org/x/crypto/ssh"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal"
|
||||
nbssh "github.com/netbirdio/netbird/client/ssh"
|
||||
"github.com/netbirdio/netbird/client/ssh/detection"
|
||||
)
|
||||
|
||||
const (
|
||||
sshDialTimeout = 30 * time.Second
|
||||
sshDetectionTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// SSHTerminalListener receives SSH session events. It is implemented in Swift.
|
||||
//
|
||||
// All callbacks are invoked from goroutines and may run concurrently with each
|
||||
// other; the implementation must be safe to call from any thread.
|
||||
type SSHTerminalListener interface {
|
||||
OnConnected()
|
||||
OnData(data []byte)
|
||||
OnClose(reason string)
|
||||
OnError(message string)
|
||||
}
|
||||
|
||||
// SSHClient is a NetBird-aware SSH client exposed to Swift via gomobile.
|
||||
//
|
||||
// It dials through the running NetBird tunnel and runs a standard SSH session
|
||||
// on top with PTY enabled. Host-key verification uses the NetBird-provided
|
||||
// peer SSH host keys, identical to the desktop client.
|
||||
type SSHClient struct {
|
||||
nb *Client
|
||||
mu sync.Mutex
|
||||
listener SSHTerminalListener
|
||||
urlOpener URLOpener
|
||||
|
||||
sshClient *gossh.Client
|
||||
session *gossh.Session
|
||||
stdin io.WriteCloser
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewSSHClient creates a new SSH client bound to the running NetBird Client.
|
||||
func NewSSHClient(c *Client) *SSHClient {
|
||||
return &SSHClient{nb: c}
|
||||
}
|
||||
|
||||
// SetListener registers the Swift listener. Must be called before Connect to
|
||||
// receive any events.
|
||||
func (s *SSHClient) SetListener(l SSHTerminalListener) {
|
||||
s.mu.Lock()
|
||||
s.listener = l
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// SetURLOpener registers the Swift URL opener used to display the device-code
|
||||
// authorization page in an in-app browser when the target peer requires JWT
|
||||
// authentication. Must be set before Connect to be effective.
|
||||
func (s *SSHClient) SetURLOpener(opener URLOpener) {
|
||||
s.mu.Lock()
|
||||
s.urlOpener = opener
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// Connect dials the SSH server through the NetBird tunnel and performs the
|
||||
// SSH handshake. It auto-detects the server type via SSH banner inspection
|
||||
// and selects the appropriate authentication path:
|
||||
//
|
||||
// - NetBird-SSH server requiring JWT: launches the OAuth 2.0 device-code
|
||||
// flow, opens the verification URL through the registered URLOpener, and
|
||||
// uses the resulting token as the SSH password. Host-key verification
|
||||
// uses the NetBird peer registry.
|
||||
// - NetBird-SSH server without JWT: authenticates with the NetBird SSH
|
||||
// private key. Host-key verification uses the NetBird peer registry.
|
||||
// - Regular SSH server (e.g. OpenSSH): authenticates with the NetBird key
|
||||
// first (so a user-installed NetBird public key works), then falls back
|
||||
// to the supplied password if non-empty. Host-key verification is
|
||||
// disabled (TOFU pending).
|
||||
//
|
||||
// The password parameter is only consulted for regular SSH servers.
|
||||
func (s *SSHClient) Connect(host string, port int, user, password string) error {
|
||||
cfg, cc := s.nb.sshState()
|
||||
if cc == nil {
|
||||
return errors.New("netbird client not running")
|
||||
}
|
||||
if cfg == nil {
|
||||
return errors.New("netbird config not loaded")
|
||||
}
|
||||
engine := cc.Engine()
|
||||
if engine == nil {
|
||||
return errors.New("netbird engine not available")
|
||||
}
|
||||
|
||||
serverType := detectServerType(host, port)
|
||||
log.Infof("SSH server type for %s:%d: %s", host, port, serverType)
|
||||
|
||||
authMethods, hostKeyCallback, err := s.buildAuth(cfg, engine, serverType, password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
clientConfig := &gossh.ClientConfig{
|
||||
User: user,
|
||||
Auth: authMethods,
|
||||
HostKeyCallback: hostKeyCallback,
|
||||
Timeout: sshDialTimeout,
|
||||
}
|
||||
return s.dialAndHandshake(host, port, clientConfig)
|
||||
}
|
||||
|
||||
// StartSession requests a PTY and starts an interactive shell. Output from
|
||||
// the session is forwarded to the listener via OnData.
|
||||
func (s *SSHClient) StartSession(cols, rows int) error {
|
||||
log.Debugf("SSH: starting session %dx%d", cols, rows)
|
||||
s.mu.Lock()
|
||||
sshClient := s.sshClient
|
||||
s.mu.Unlock()
|
||||
|
||||
if sshClient == nil {
|
||||
return errors.New("ssh client not connected")
|
||||
}
|
||||
|
||||
session, err := sshClient.NewSession()
|
||||
if err != nil {
|
||||
return fmt.Errorf("new session: %w", err)
|
||||
}
|
||||
|
||||
modes := gossh.TerminalModes{
|
||||
gossh.ECHO: 1,
|
||||
gossh.TTY_OP_ISPEED: 14400,
|
||||
gossh.TTY_OP_OSPEED: 14400,
|
||||
gossh.VINTR: 3,
|
||||
gossh.VQUIT: 28,
|
||||
gossh.VERASE: 127,
|
||||
}
|
||||
if err := session.RequestPty("xterm-256color", rows, cols, modes); err != nil {
|
||||
closeQuiet(session, "session after pty error")
|
||||
return fmt.Errorf("request pty: %w", err)
|
||||
}
|
||||
|
||||
stdin, err := session.StdinPipe()
|
||||
if err != nil {
|
||||
closeQuiet(session, "session after stdin error")
|
||||
return fmt.Errorf("stdin pipe: %w", err)
|
||||
}
|
||||
stdout, err := session.StdoutPipe()
|
||||
if err != nil {
|
||||
closeQuiet(session, "session after stdout error")
|
||||
return fmt.Errorf("stdout pipe: %w", err)
|
||||
}
|
||||
stderr, err := session.StderrPipe()
|
||||
if err != nil {
|
||||
closeQuiet(session, "session after stderr error")
|
||||
return fmt.Errorf("stderr pipe: %w", err)
|
||||
}
|
||||
|
||||
if err := session.Shell(); err != nil {
|
||||
closeQuiet(session, "session after shell error")
|
||||
return fmt.Errorf("start shell: %w", err)
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.session = session
|
||||
s.stdin = stdin
|
||||
s.mu.Unlock()
|
||||
|
||||
go s.readLoop(stdout, "stdout")
|
||||
go s.readLoop(stderr, "stderr")
|
||||
log.Debug("SSH: session started, shell running")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write sends data to the SSH session stdin.
|
||||
func (s *SSHClient) Write(data []byte) error {
|
||||
s.mu.Lock()
|
||||
stdin := s.stdin
|
||||
s.mu.Unlock()
|
||||
if stdin == nil {
|
||||
return errors.New("ssh session not started")
|
||||
}
|
||||
if _, err := stdin.Write(data); err != nil {
|
||||
return fmt.Errorf("write stdin: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Resize updates the PTY window size.
|
||||
func (s *SSHClient) Resize(cols, rows int) error {
|
||||
s.mu.Lock()
|
||||
session := s.session
|
||||
s.mu.Unlock()
|
||||
if session == nil {
|
||||
return errors.New("ssh session not started")
|
||||
}
|
||||
return session.WindowChange(rows, cols)
|
||||
}
|
||||
|
||||
// Close terminates the SSH session and underlying connection. Safe to call
|
||||
// multiple times.
|
||||
func (s *SSHClient) Close() error {
|
||||
s.mu.Lock()
|
||||
sshClient := s.sshClient
|
||||
session := s.session
|
||||
stdin := s.stdin
|
||||
s.sshClient = nil
|
||||
s.session = nil
|
||||
s.stdin = nil
|
||||
s.mu.Unlock()
|
||||
|
||||
if stdin != nil {
|
||||
if err := stdin.Close(); err != nil {
|
||||
log.Debugf("ssh: stdin close: %v", err)
|
||||
}
|
||||
}
|
||||
if session != nil {
|
||||
if err := session.Close(); err != nil && !errors.Is(err, io.EOF) {
|
||||
log.Debugf("ssh: session close: %v", err)
|
||||
}
|
||||
}
|
||||
var firstErr error
|
||||
if sshClient != nil {
|
||||
if err := sshClient.Close(); err != nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
s.notifyClose("closed by client")
|
||||
return firstErr
|
||||
}
|
||||
|
||||
func (s *SSHClient) buildAuth(cfg *profilemanager.Config, engine *internal.Engine,
|
||||
serverType detection.ServerType, password string) ([]gossh.AuthMethod, gossh.HostKeyCallback, error) {
|
||||
|
||||
switch serverType {
|
||||
case detection.ServerTypeNetBirdJWT:
|
||||
token, err := s.requestJWTToken(cfg)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("jwt: %w", err)
|
||||
}
|
||||
auths := []gossh.AuthMethod{gossh.Password(token)}
|
||||
return auths, nbssh.CreateHostKeyCallback(&engineHostKeyVerifier{engine: engine}), nil
|
||||
|
||||
case detection.ServerTypeNetBirdNoJWT:
|
||||
if cfg.SSHKey == "" {
|
||||
return nil, nil, errors.New("no NetBird SSH key available")
|
||||
}
|
||||
signer, err := gossh.ParsePrivateKey([]byte(cfg.SSHKey))
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("parse netbird ssh key: %w", err)
|
||||
}
|
||||
auths := []gossh.AuthMethod{gossh.PublicKeys(signer)}
|
||||
return auths, nbssh.CreateHostKeyCallback(&engineHostKeyVerifier{engine: engine}), nil
|
||||
|
||||
default: // regular SSH
|
||||
var auths []gossh.AuthMethod
|
||||
if cfg.SSHKey != "" {
|
||||
if signer, err := gossh.ParsePrivateKey([]byte(cfg.SSHKey)); err == nil {
|
||||
auths = append(auths, gossh.PublicKeys(signer))
|
||||
} else {
|
||||
log.Debugf("ssh: parse netbird key for regular auth: %v", err)
|
||||
}
|
||||
}
|
||||
if password != "" {
|
||||
pw := password
|
||||
auths = append(auths, gossh.Password(pw))
|
||||
auths = append(auths, gossh.KeyboardInteractive(func(_, _ string, questions []string, _ []bool) ([]string, error) {
|
||||
answers := make([]string, len(questions))
|
||||
for i := range questions {
|
||||
answers[i] = pw
|
||||
}
|
||||
return answers, nil
|
||||
}))
|
||||
}
|
||||
if len(auths) == 0 {
|
||||
return nil, nil, errors.New("no auth method available: provide a password or configure NetBird SSH key")
|
||||
}
|
||||
return auths, gossh.InsecureIgnoreHostKey(), nil // nolint:gosec // TOFU not yet implemented
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SSHClient) requestJWTToken(cfg *profilemanager.Config) (string, error) {
|
||||
s.mu.Lock()
|
||||
urlOpener := s.urlOpener
|
||||
s.mu.Unlock()
|
||||
if urlOpener == nil {
|
||||
return "", errors.New("URL opener not configured for JWT auth")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
flow, err := auth.NewOAuthFlow(ctx, cfg, false, true, profilemanager.GetLoginHint())
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("create oauth flow: %w", err)
|
||||
}
|
||||
|
||||
flowInfo, err := flow.RequestAuthInfo(ctx)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("request auth info: %w", err)
|
||||
}
|
||||
|
||||
go urlOpener.Open(flowInfo.VerificationURIComplete, flowInfo.UserCode)
|
||||
|
||||
tokenInfo, err := flow.WaitToken(ctx, flowInfo)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("wait for token: %w", err)
|
||||
}
|
||||
|
||||
token := tokenInfo.GetTokenToUse()
|
||||
if token == "" {
|
||||
return "", errors.New("empty token returned by IdP")
|
||||
}
|
||||
return token, nil
|
||||
}
|
||||
|
||||
func (s *SSHClient) dialAndHandshake(host string, port int, clientConfig *gossh.ClientConfig) error {
|
||||
addr := net.JoinHostPort(host, strconv.Itoa(port))
|
||||
log.Infof("SSH: connecting to %s as %s", addr, clientConfig.User)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), sshDialTimeout)
|
||||
defer cancel()
|
||||
|
||||
var dialer net.Dialer
|
||||
conn, err := dialer.DialContext(ctx, "tcp", addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dial %s: %w", addr, err)
|
||||
}
|
||||
|
||||
sshConn, chans, reqs, err := gossh.NewClientConn(conn, addr, clientConfig)
|
||||
if err != nil {
|
||||
if cerr := conn.Close(); cerr != nil {
|
||||
log.Debugf("ssh: close after handshake error: %v", cerr)
|
||||
}
|
||||
return fmt.Errorf("ssh handshake: %w", err)
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.sshClient = gossh.NewClient(sshConn, chans, reqs)
|
||||
listener := s.listener
|
||||
s.mu.Unlock()
|
||||
|
||||
log.Infof("SSH: connected to %s", addr)
|
||||
if listener != nil {
|
||||
listener.OnConnected()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SSHClient) readLoop(r io.Reader, name string) {
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, err := r.Read(buf)
|
||||
if n > 0 {
|
||||
s.mu.Lock()
|
||||
listener := s.listener
|
||||
s.mu.Unlock()
|
||||
if listener != nil {
|
||||
chunk := make([]byte, n)
|
||||
copy(chunk, buf[:n])
|
||||
listener.OnData(chunk)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, io.EOF) {
|
||||
log.Debugf("ssh %s read: %v", name, err)
|
||||
}
|
||||
s.notifyClose(err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SSHClient) notifyClose(reason string) {
|
||||
s.mu.Lock()
|
||||
if s.closed {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
s.closed = true
|
||||
listener := s.listener
|
||||
s.mu.Unlock()
|
||||
if listener != nil {
|
||||
listener.OnClose(reason)
|
||||
}
|
||||
}
|
||||
|
||||
// engineHostKeyVerifier adapts *internal.Engine to nbssh.HostKeyVerifier.
|
||||
type engineHostKeyVerifier struct {
|
||||
engine *internal.Engine
|
||||
}
|
||||
|
||||
func (v *engineHostKeyVerifier) VerifySSHHostKey(peerAddress string, presented []byte) error {
|
||||
storedKey, found := v.engine.GetPeerSSHKey(peerAddress)
|
||||
if !found {
|
||||
return nbssh.ErrPeerNotFound
|
||||
}
|
||||
return nbssh.VerifyHostKey(storedKey, presented, peerAddress)
|
||||
}
|
||||
|
||||
func detectServerType(host string, port int) detection.ServerType {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), sshDetectionTimeout)
|
||||
defer cancel()
|
||||
|
||||
dialer := &net.Dialer{}
|
||||
serverType, err := detection.DetectSSHServerType(ctx, dialer, host, port)
|
||||
if err != nil {
|
||||
log.Debugf("ssh: server detection for %s:%d failed: %v (assuming regular SSH)", host, port, err)
|
||||
return detection.ServerTypeRegular
|
||||
}
|
||||
return serverType
|
||||
}
|
||||
|
||||
func closeQuiet(c io.Closer, label string) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
if err := c.Close(); err != nil && !errors.Is(err, io.EOF) {
|
||||
log.Debugf("ssh: close %s: %v", label, err)
|
||||
}
|
||||
}
|
||||
@@ -112,7 +112,7 @@ func (c *Controller) CountStreams() int {
|
||||
return c.peersUpdateManager.CountStreams()
|
||||
}
|
||||
|
||||
func (c *Controller) sendUpdateAccountPeers(ctx context.Context, accountID string) error {
|
||||
func (c *Controller) sendUpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) error {
|
||||
log.WithContext(ctx).Tracef("updating peers for account %s from %s", accountID, util.GetCallerName())
|
||||
account, err := c.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
|
||||
if err != nil {
|
||||
@@ -175,6 +175,10 @@ func (c *Controller) sendUpdateAccountPeers(ctx context.Context, accountID strin
|
||||
continue
|
||||
}
|
||||
|
||||
if c.accountManagerMetrics != nil {
|
||||
c.accountManagerMetrics.CountNmapTriggered(string(reason.Resource), string(reason.Operation))
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
semaphore <- struct{}{}
|
||||
go func(p *nbpeer.Peer) {
|
||||
@@ -242,14 +246,14 @@ func (c *Controller) bufferSendUpdateAccountPeers(ctx context.Context, accountID
|
||||
|
||||
go func() {
|
||||
defer b.mu.Unlock()
|
||||
_ = c.sendUpdateAccountPeers(ctx, accountID)
|
||||
_ = c.sendUpdateAccountPeers(ctx, accountID, reason)
|
||||
if !b.update.Load() {
|
||||
return
|
||||
}
|
||||
b.update.Store(false)
|
||||
if b.next == nil {
|
||||
b.next = time.AfterFunc(time.Duration(c.updateAccountPeersBufferInterval.Load()), func() {
|
||||
_ = c.sendUpdateAccountPeers(ctx, accountID)
|
||||
_ = c.sendUpdateAccountPeers(ctx, accountID, reason)
|
||||
})
|
||||
return
|
||||
}
|
||||
@@ -265,7 +269,7 @@ func (c *Controller) UpdateAccountPeers(ctx context.Context, accountID string, r
|
||||
if c.accountManagerMetrics != nil {
|
||||
c.accountManagerMetrics.CountUpdateAccountPeersTriggered(string(reason.Resource), string(reason.Operation))
|
||||
}
|
||||
return c.sendUpdateAccountPeers(ctx, accountID)
|
||||
return c.sendUpdateAccountPeers(ctx, accountID, reason)
|
||||
}
|
||||
|
||||
func (c *Controller) UpdateAccountPeer(ctx context.Context, accountId string, peerId string) error {
|
||||
@@ -359,14 +363,14 @@ func (c *Controller) BufferUpdateAccountPeers(ctx context.Context, accountID str
|
||||
|
||||
go func() {
|
||||
defer b.mu.Unlock()
|
||||
_ = c.sendUpdateAccountPeers(ctx, accountID)
|
||||
_ = c.sendUpdateAccountPeers(ctx, accountID, reason)
|
||||
if !b.update.Load() {
|
||||
return
|
||||
}
|
||||
b.update.Store(false)
|
||||
if b.next == nil {
|
||||
b.next = time.AfterFunc(time.Duration(c.updateAccountPeersBufferInterval.Load()), func() {
|
||||
_ = c.sendUpdateAccountPeers(ctx, accountID)
|
||||
_ = c.sendUpdateAccountPeers(ctx, accountID, reason)
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -932,11 +932,7 @@ func (s *Service) validateL4Target(target *Target) error {
|
||||
if target.TargetId == "" {
|
||||
return errors.New("target_id is required for L4 services")
|
||||
}
|
||||
// Cluster targets resolve their upstream host:port from the target's
|
||||
// own Host/Port fields just like the other L4 types — buildPathMappings
|
||||
// emits net.JoinHostPort(target.Host, target.Port) for every L4
|
||||
// target, so allowing port=0 here would let ":0" reach the proxy.
|
||||
if target.Port == 0 {
|
||||
if target.TargetType != TargetTypeCluster && target.Port == 0 {
|
||||
return errors.New("target port is required for L4 services")
|
||||
}
|
||||
switch target.TargetType {
|
||||
|
||||
@@ -1176,12 +1176,7 @@ func TestValidate_HTTPClusterTarget_RequiresDirectUpstream(t *testing.T) {
|
||||
assert.ErrorContains(t, rp.Validate(), "direct upstream disabled", "cluster target must reject direct_upstream=false")
|
||||
}
|
||||
|
||||
// TestValidate_L4ClusterTarget_RequiresPort confirms that an L4 cluster
|
||||
// target without an explicit port is rejected. buildPathMappings emits
|
||||
// net.JoinHostPort(target.Host, target.Port) for every L4 target — so
|
||||
// allowing port=0 would let the proxy ship ":0" upstreams. The port
|
||||
// requirement is the same as every other L4 target type.
|
||||
func TestValidate_L4ClusterTarget_RequiresPort(t *testing.T) {
|
||||
func TestValidate_L4ClusterTarget(t *testing.T) {
|
||||
rp := validProxy()
|
||||
rp.Mode = ModeTCP
|
||||
rp.ListenPort = 9000
|
||||
@@ -1191,12 +1186,7 @@ func TestValidate_L4ClusterTarget_RequiresPort(t *testing.T) {
|
||||
Protocol: "tcp",
|
||||
Enabled: true,
|
||||
}}
|
||||
assert.ErrorContains(t, rp.Validate(), "port is required",
|
||||
"L4 cluster target must require an explicit port like other L4 target types")
|
||||
|
||||
rp.Targets[0].Port = 5432
|
||||
rp.Targets[0].Host = "db.lan"
|
||||
require.NoError(t, rp.Validate(), "L4 cluster target with host:port must validate")
|
||||
require.NoError(t, rp.Validate(), "L4 cluster target must validate without an explicit port")
|
||||
}
|
||||
|
||||
func TestService_Copy_RoundtripsPrivate(t *testing.T) {
|
||||
|
||||
@@ -102,7 +102,7 @@ func generateSessionKeyPair(t *testing.T) (string, string) {
|
||||
|
||||
func createSessionToken(t *testing.T, privKeyB64, userID, domain string) string {
|
||||
t.Helper()
|
||||
token, err := sessionkey.SignToken(privKeyB64, userID, "", domain, auth.MethodOIDC, nil, nil, time.Hour)
|
||||
token, err := sessionkey.SignToken(privKeyB64, userID, domain, auth.MethodOIDC, nil, time.Hour)
|
||||
require.NoError(t, err)
|
||||
return token
|
||||
}
|
||||
@@ -394,10 +394,6 @@ func (m *testValidateSessionProxyManager) ClusterSupportsCrowdSec(_ context.Cont
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *testValidateSessionProxyManager) ClusterSupportsPrivate(_ context.Context, _ string) *bool {
|
||||
return nil
|
||||
}
|
||||
|
||||
type testValidateSessionUsersManager struct {
|
||||
store store.Store
|
||||
}
|
||||
@@ -405,24 +401,3 @@ type testValidateSessionUsersManager struct {
|
||||
func (m *testValidateSessionUsersManager) GetUser(ctx context.Context, userID string) (*types.User, error) {
|
||||
return m.store.GetUserByUserID(ctx, store.LockingStrengthNone, userID)
|
||||
}
|
||||
|
||||
func (m *testValidateSessionUsersManager) GetUserWithGroups(ctx context.Context, userID string) (*types.User, []*types.Group, error) {
|
||||
user, err := m.store.GetUserByUserID(ctx, store.LockingStrengthNone, userID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if len(user.AutoGroups) == 0 {
|
||||
return user, nil, nil
|
||||
}
|
||||
groupsMap, err := m.store.GetGroupsByIDs(ctx, store.LockingStrengthNone, user.AccountID, user.AutoGroups)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
groups := make([]*types.Group, 0, len(user.AutoGroups))
|
||||
for _, id := range user.AutoGroups {
|
||||
if g, ok := groupsMap[id]; ok && g != nil {
|
||||
groups = append(groups, g)
|
||||
}
|
||||
}
|
||||
return user, groups, nil
|
||||
}
|
||||
|
||||
@@ -4734,13 +4734,7 @@ func (s *SqlStore) GetPeerByIP(ctx context.Context, lockStrength LockingStrength
|
||||
result := tx.
|
||||
Take(&peer, fmt.Sprintf("account_id = ? AND %s = ?", column), accountID, jsonValue)
|
||||
if result.Error != nil {
|
||||
// A tunnel-IP miss is an expected outcome (e.g. the proxy's
|
||||
// ValidateTunnelPeer probing an address that isn't in the
|
||||
// account roster); surface it as NotFound so callers can tell
|
||||
// it apart from a real store failure.
|
||||
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
return nil, status.Errorf(status.NotFound, "peer with ip %s not found", ip.String())
|
||||
}
|
||||
// no logging here
|
||||
return nil, status.Errorf(status.Internal, "failed to get peer from store")
|
||||
}
|
||||
|
||||
@@ -5968,7 +5962,6 @@ func (s *SqlStore) getClusterCapability(ctx context.Context, clusterAddr, column
|
||||
}
|
||||
|
||||
err := s.db.
|
||||
WithContext(ctx).
|
||||
Model(&proxy.Proxy{}).
|
||||
Select("COUNT(CASE WHEN "+column+" IS NOT NULL THEN 1 END) > 0 AS has_capability, "+
|
||||
"COALESCE(MAX(CASE WHEN "+column+" = true THEN 1 ELSE 0 END), 0) = 1 AS any_true").
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
)
|
||||
|
||||
func TestSqlStore_GetAccount_PrivateServiceRoundtrip(t *testing.T) {
|
||||
if os.Getenv("CI") == "true" && (runtime.GOOS == "darwin" || runtime.GOOS == "windows") {
|
||||
if (os.Getenv("CI") == "true" && runtime.GOOS == "darwin") || runtime.GOOS == "windows" {
|
||||
t.Skip("skip CI tests on darwin and windows")
|
||||
}
|
||||
|
||||
|
||||
@@ -491,27 +491,6 @@ func Test_GetAccount(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestSqlStore_GetPeerByIP_NotFound pins the not-found semantics the
|
||||
// proxy's ValidateTunnelPeer relies on: a tunnel-IP that isn't in the
|
||||
// account roster must surface as a NotFound error (not a generic
|
||||
// Internal) so callers can distinguish an expected miss from a real
|
||||
// store failure. A known IP still resolves.
|
||||
func TestSqlStore_GetPeerByIP_NotFound(t *testing.T) {
|
||||
runTestForAllEngines(t, "../testdata/store.sql", func(t *testing.T, store Store) {
|
||||
const accountID = "bf1c8084-ba50-4ce7-9439-34653001fc3b"
|
||||
|
||||
peer, err := store.GetPeerByIP(context.Background(), LockingStrengthNone, accountID, net.ParseIP("192.168.0.0"))
|
||||
require.NoError(t, err, "known tunnel IP must resolve")
|
||||
require.NotNil(t, peer)
|
||||
|
||||
_, err = store.GetPeerByIP(context.Background(), LockingStrengthNone, accountID, net.ParseIP("100.65.0.99"))
|
||||
require.Error(t, err, "unknown tunnel IP must error")
|
||||
parsedErr, ok := status.FromError(err)
|
||||
require.True(t, ok, "error must be a status error")
|
||||
require.Equal(t, status.NotFound, parsedErr.Type(), "tunnel-IP miss must be NotFound, not Internal")
|
||||
})
|
||||
}
|
||||
|
||||
func TestSqlStore_SavePeer(t *testing.T) {
|
||||
store, cleanUp, err := NewTestStoreFromSQL(context.Background(), "../testdata/store.sql", t.TempDir())
|
||||
t.Cleanup(cleanUp)
|
||||
|
||||
@@ -13,6 +13,7 @@ type AccountManagerMetrics struct {
|
||||
ctx context.Context
|
||||
updateAccountPeersDurationMs metric.Float64Histogram
|
||||
updateAccountPeersCounter metric.Int64Counter
|
||||
nmapCounter metric.Int64Counter
|
||||
getPeerNetworkMapDurationMs metric.Float64Histogram
|
||||
networkMapObjectCount metric.Int64Histogram
|
||||
peerMetaUpdateCount metric.Int64Counter
|
||||
@@ -59,6 +60,13 @@ func NewAccountManagerMetrics(ctx context.Context, meter metric.Meter) (*Account
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nmapCounter, err := meter.Int64Counter("management.network.map.counter",
|
||||
metric.WithUnit("1"),
|
||||
metric.WithDescription("Number of network maps computed, labeled by resource and operation trigger"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
peerMetaUpdateCount, err := meter.Int64Counter("management.account.peer.meta.update.counter",
|
||||
metric.WithUnit("1"),
|
||||
metric.WithDescription("Number of updates with new meta data from the peers"))
|
||||
@@ -93,6 +101,7 @@ func NewAccountManagerMetrics(ctx context.Context, meter metric.Meter) (*Account
|
||||
peerMetaUpdateCount: peerMetaUpdateCount,
|
||||
peerStatusUpdateCounter: peerStatusUpdateCounter,
|
||||
peerStatusUpdateDurationMs: peerStatusUpdateDurationMs,
|
||||
nmapCounter: nmapCounter,
|
||||
}, nil
|
||||
|
||||
}
|
||||
@@ -145,6 +154,16 @@ func (metrics *AccountManagerMetrics) CountUpdateAccountPeersTriggered(resource,
|
||||
)
|
||||
}
|
||||
|
||||
// CountNmapTriggered increments the counter for calculated network maps with resource and operation labels.
|
||||
func (metrics *AccountManagerMetrics) CountNmapTriggered(resource, operation string) {
|
||||
metrics.nmapCounter.Add(metrics.ctx, 1,
|
||||
metric.WithAttributes(
|
||||
attribute.String("resource", resource),
|
||||
attribute.String("operation", operation),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
// CountPeerMetUpdate counts the number of peer meta updates
|
||||
func (metrics *AccountManagerMetrics) CountPeerMetUpdate() {
|
||||
metrics.peerMetaUpdateCount.Add(metrics.ctx, 1)
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
stdlog "log"
|
||||
"net"
|
||||
"net/http"
|
||||
@@ -43,7 +42,7 @@ const privateInboundPortHTTPS = 443
|
||||
const privateInboundPortHTTP = 80
|
||||
|
||||
// inboundManager wires per-account inbound listeners into the proxy
|
||||
// pipeline when --private is enabled. When disabled the manager
|
||||
// pipeline when --private-inbound is enabled. When disabled the manager
|
||||
// is nil and every method on *Server that touches it short-circuits.
|
||||
type inboundManager struct {
|
||||
logger *log.Logger
|
||||
@@ -56,18 +55,15 @@ type inboundManager struct {
|
||||
}
|
||||
|
||||
// inboundEntry owns the listeners, router and HTTP servers for a single
|
||||
// account's embedded netstack. errorLogWriters retain the logrus pipe
|
||||
// writers backing each http.Server's ErrorLog so tearDown can close
|
||||
// them — otherwise the pipe + its scanner goroutine leak per account.
|
||||
// account's embedded netstack.
|
||||
type inboundEntry struct {
|
||||
router *nbtcp.Router
|
||||
tlsListener net.Listener
|
||||
plainListener net.Listener
|
||||
httpsServer *http.Server
|
||||
httpServer *http.Server
|
||||
errorLogWriters []*io.PipeWriter
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
router *nbtcp.Router
|
||||
tlsListener net.Listener
|
||||
plainListener net.Listener
|
||||
httpsServer *http.Server
|
||||
httpServer *http.Server
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// pendingInboundRoute holds a route that arrived before the account's
|
||||
@@ -151,34 +147,30 @@ func (m *inboundManager) bringUp(ctx context.Context, accountID types.AccountID,
|
||||
return types.WithOverlayOrigin(ctx)
|
||||
}
|
||||
|
||||
httpsErrLog, httpsErrW := newInboundErrorLog(m.logger, "https", accountID)
|
||||
httpErrLog, httpErrW := newInboundErrorLog(m.logger, "http", accountID)
|
||||
|
||||
httpsServer := &http.Server{
|
||||
Handler: scopedHandler,
|
||||
TLSConfig: m.tlsConfig,
|
||||
ReadHeaderTimeout: httpInboundReadHeaderTimeout,
|
||||
IdleTimeout: httpInboundIdleTimeout,
|
||||
ErrorLog: httpsErrLog,
|
||||
ErrorLog: newInboundErrorLog(m.logger, "https", accountID),
|
||||
ConnContext: markOverlayOrigin,
|
||||
}
|
||||
httpServer := &http.Server{
|
||||
Handler: scopedHandler,
|
||||
ReadHeaderTimeout: httpInboundReadHeaderTimeout,
|
||||
IdleTimeout: httpInboundIdleTimeout,
|
||||
ErrorLog: httpErrLog,
|
||||
ErrorLog: newInboundErrorLog(m.logger, "http", accountID),
|
||||
ConnContext: markOverlayOrigin,
|
||||
}
|
||||
|
||||
runCtx, cancel := context.WithCancel(ctx)
|
||||
entry := &inboundEntry{
|
||||
router: router,
|
||||
tlsListener: tlsListener,
|
||||
plainListener: plainListener,
|
||||
httpsServer: httpsServer,
|
||||
httpServer: httpServer,
|
||||
errorLogWriters: []*io.PipeWriter{httpsErrW, httpErrW},
|
||||
cancel: cancel,
|
||||
router: router,
|
||||
tlsListener: tlsListener,
|
||||
plainListener: plainListener,
|
||||
httpsServer: httpsServer,
|
||||
httpServer: httpServer,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
entry.wg.Add(1)
|
||||
@@ -245,14 +237,6 @@ func (m *inboundManager) tearDown(accountID types.AccountID, entry *inboundEntry
|
||||
m.logger.Debugf("close per-account plain listener: %v", err)
|
||||
}
|
||||
entry.wg.Wait()
|
||||
// Close the ErrorLog pipes only after the http.Servers have fully
|
||||
// stopped so any straggling stdlib write doesn't race with the
|
||||
// close. Each writer also tears down the logrus scanner goroutine.
|
||||
for _, w := range entry.errorLogWriters {
|
||||
if err := w.Close(); err != nil {
|
||||
m.logger.Debugf("close per-account inbound error log writer: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AddRoute records an SNI/host route on the account's per-account router.
|
||||
@@ -390,7 +374,7 @@ func (m *inboundManager) ListenerInfo(accountID types.AccountID) (InboundListene
|
||||
}
|
||||
|
||||
// Snapshot returns the inbound listener state for every account that has
|
||||
// a live listener at call time. Empty when --private is off or
|
||||
// a live listener at call time. Empty when --private-inbound is off or
|
||||
// no accounts have come up yet.
|
||||
func (m *inboundManager) Snapshot() map[types.AccountID]InboundListenerInfo {
|
||||
if m == nil {
|
||||
@@ -513,7 +497,7 @@ func accountTunnelLookup(client *embed.Client) auth.TunnelLookupFunc {
|
||||
// peerstore lookup to every request's context before delegating to next.
|
||||
// Calling on the host-level listener is a no-op because that path never
|
||||
// installs this wrapper, so the existing behaviour stays byte-for-byte
|
||||
// identical when --private is off or the request didn't arrive
|
||||
// identical when --private-inbound is off or the request didn't arrive
|
||||
// on a per-account listener.
|
||||
func withTunnelLookup(next http.Handler, lookup auth.TunnelLookupFunc) http.Handler {
|
||||
if lookup == nil {
|
||||
@@ -554,14 +538,10 @@ func (a inboundDebugAdapter) InboundListeners() map[types.AccountID]debug.Inboun
|
||||
}
|
||||
|
||||
// newInboundErrorLog routes a per-account http.Server's stdlib error
|
||||
// stream through logrus at warn level. The returned PipeWriter must be
|
||||
// closed by the caller (tearDown) once the http.Server has shut down —
|
||||
// otherwise the pipe and its scanner goroutine leak per account, see
|
||||
// logrus.Entry.WriterLevel.
|
||||
func newInboundErrorLog(logger *log.Logger, scheme string, accountID types.AccountID) (*stdlog.Logger, *io.PipeWriter) {
|
||||
w := logger.WithFields(log.Fields{
|
||||
// stream through logrus at warn level.
|
||||
func newInboundErrorLog(logger *log.Logger, scheme string, accountID types.AccountID) *stdlog.Logger {
|
||||
return stdlog.New(logger.WithFields(log.Fields{
|
||||
"inbound-http": scheme,
|
||||
"account_id": accountID,
|
||||
}).WriterLevel(log.WarnLevel)
|
||||
return stdlog.New(w, "", 0), w
|
||||
}).WriterLevel(log.WarnLevel), "", 0)
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"bufio"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@@ -140,7 +139,7 @@ func TestInboundManager_AddRouteAfterReady_RegistersDirectly(t *testing.T) {
|
||||
|
||||
// TestPrivateCapability_DerivedFromPrivateOnly tests that the capability
|
||||
// bit reported upstream tracks --private exclusively. The previous
|
||||
// --private flag has been folded into --private.
|
||||
// --private-inbound flag has been folded into --private.
|
||||
func TestPrivateCapability_DerivedFromPrivateOnly(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
@@ -319,7 +318,7 @@ func TestInboundManager_ListenerInfo(t *testing.T) {
|
||||
}
|
||||
|
||||
// TestInboundManager_NilManagerSafe ensures the observability accessors
|
||||
// are safe to call when --private is off (nil manager).
|
||||
// are safe to call when --private-inbound is off (nil manager).
|
||||
func TestInboundManager_NilManagerSafe(t *testing.T) {
|
||||
var mgr *inboundManager
|
||||
_, ok := mgr.ListenerInfo("anything")
|
||||
@@ -483,38 +482,6 @@ func selfSignedTLSConfig(t *testing.T) *tls.Config {
|
||||
return &tls.Config{Certificates: []tls.Certificate{cert}, MinVersion: tls.VersionTLS12} //nolint:gosec
|
||||
}
|
||||
|
||||
// TestNewInboundErrorLog_WriterIsCloseable guards the close path on the
|
||||
// logrus PipeWriter that backs each per-account http.Server's ErrorLog.
|
||||
// logrus.Entry.WriterLevel returns an *io.PipeWriter that owns a pipe +
|
||||
// scanner goroutine; the caller must Close() it on teardown or the
|
||||
// resources leak per account. The contract is verified two ways:
|
||||
//
|
||||
// - the constructor returns a non-nil writer the caller can keep,
|
||||
// - writing to the writer after Close() fails with io.ErrClosedPipe,
|
||||
// which is the only externally observable sign that Close was wired.
|
||||
//
|
||||
// A leaking refactor (forgetting to thread the writer to tearDown, or
|
||||
// dropping the Close call) would still pass this test individually but
|
||||
// fail an integration goleak check; this unit test is the cheap first
|
||||
// line of defence.
|
||||
func TestNewInboundErrorLog_WriterIsCloseable(t *testing.T) {
|
||||
logger := quietLogger()
|
||||
stdLog, writer := newInboundErrorLog(logger, "https", types.AccountID("acct-1"))
|
||||
|
||||
require.NotNil(t, stdLog, "newInboundErrorLog must return a non-nil *log.Logger")
|
||||
require.NotNil(t, writer, "newInboundErrorLog must return the underlying PipeWriter so tearDown can Close it")
|
||||
|
||||
// First Close succeeds.
|
||||
require.NoError(t, writer.Close(), "PipeWriter.Close should succeed the first time")
|
||||
|
||||
// After Close, the writer must refuse new writes — that's the only
|
||||
// behavioural signal that the pipe (and its scanner goroutine) has
|
||||
// shut down.
|
||||
_, err := writer.Write([]byte("post-close write\n"))
|
||||
require.ErrorIs(t, err, io.ErrClosedPipe,
|
||||
"writes after Close must surface io.ErrClosedPipe so callers know the writer is gone")
|
||||
}
|
||||
|
||||
// testCertPEM / testKeyPEM are a minimal RSA self-signed cert for
|
||||
// 127.0.0.1 — only used by tests that need a working TLS handshake.
|
||||
var testCertPEM = []byte(`-----BEGIN CERTIFICATE-----
|
||||
|
||||
@@ -346,15 +346,13 @@ func (mw *Middleware) forwardWithSessionCookie(w http.ResponseWriter, r *http.Re
|
||||
// management unreachable, peer unknown, user not in group) returns false so
|
||||
// the caller falls back to the existing OIDC scheme dispatch.
|
||||
//
|
||||
// The fast-path is gated on TunnelLookupFromContext(r.Context()) being
|
||||
// present — that context value is attached only by the per-account
|
||||
// inbound (overlay) listener. The host listener never sets it, so a
|
||||
// public client whose source IP happens to fall inside an RFC1918 / ULA
|
||||
// / CGNAT range can't impersonate a mesh peer by colliding with a
|
||||
// tunnel-IP. Once we know the request arrived over WireGuard the
|
||||
// per-account peerstore lookup is consulted: a miss denies fast (no
|
||||
// management round-trip), a hit gates the cached ValidateTunnelPeer RPC
|
||||
// that mints the session JWT.
|
||||
// Phase 3 adds a local-first short-circuit: when the request arrived on a
|
||||
// per-account inbound listener the context carries a peerstore lookup
|
||||
// (TunnelLookupFromContext). If the lookup says the IP isn't in the account's
|
||||
// roster the proxy denies fast without calling management. If the lookup
|
||||
// confirms a known peer the RPC still runs for the user-identity tail
|
||||
// (UserID + group access), but its result is cached for tunnelCacheTTL so
|
||||
// repeat requests skip management entirely.
|
||||
func (mw *Middleware) forwardWithTunnelPeer(w http.ResponseWriter, r *http.Request, host string, config DomainConfig, next http.Handler) bool {
|
||||
if mw.sessionValidator == nil {
|
||||
return false
|
||||
@@ -363,24 +361,18 @@ func (mw *Middleware) forwardWithTunnelPeer(w http.ResponseWriter, r *http.Reque
|
||||
if !clientIP.IsValid() {
|
||||
return false
|
||||
}
|
||||
|
||||
// Anti-spoof: only honour the tunnel-peer fast-path on requests that
|
||||
// were stamped by an overlay listener. Without that marker an
|
||||
// attacker could send a request from a colliding RFC1918 / CGNAT
|
||||
// source on the public listener and bypass operator auth.
|
||||
lookup := TunnelLookupFromContext(r.Context())
|
||||
if lookup == nil {
|
||||
return false
|
||||
}
|
||||
if !isTunnelSourceIP(clientIP) {
|
||||
return false
|
||||
}
|
||||
if _, ok := lookup(clientIP); !ok {
|
||||
mw.logger.WithFields(log.Fields{
|
||||
"host": host,
|
||||
"remote": clientIP,
|
||||
}).Debug("local peerstore: tunnel IP not in account roster; denying without RPC")
|
||||
return false
|
||||
|
||||
if lookup := TunnelLookupFromContext(r.Context()); lookup != nil {
|
||||
if _, ok := lookup(clientIP); !ok {
|
||||
mw.logger.WithFields(log.Fields{
|
||||
"host": host,
|
||||
"remote": clientIP,
|
||||
}).Debug("local peerstore: tunnel IP not in account roster; denying without RPC")
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
resp, _, err := mw.tunnelCache.fetch(r.Context(), tunnelCacheKey{
|
||||
|
||||
@@ -1227,93 +1227,3 @@ func TestProtect_NonOIDCSchemes_PlainHTTP_NotBlocked(t *testing.T) {
|
||||
|
||||
assert.Equal(t, http.StatusUnauthorized, rec.Code, "PIN-only domain should serve the login page on plain HTTP")
|
||||
}
|
||||
|
||||
// stubTunnelValidator records ValidateTunnelPeer calls so a test can
|
||||
// assert whether the fast-path reached management.
|
||||
type stubTunnelValidator struct {
|
||||
called bool
|
||||
resp *proto.ValidateTunnelPeerResponse
|
||||
}
|
||||
|
||||
func (s *stubTunnelValidator) ValidateSession(context.Context, *proto.ValidateSessionRequest, ...grpc.CallOption) (*proto.ValidateSessionResponse, error) {
|
||||
return nil, errors.New("not used in this test")
|
||||
}
|
||||
|
||||
func (s *stubTunnelValidator) ValidateTunnelPeer(context.Context, *proto.ValidateTunnelPeerRequest, ...grpc.CallOption) (*proto.ValidateTunnelPeerResponse, error) {
|
||||
s.called = true
|
||||
return s.resp, nil
|
||||
}
|
||||
|
||||
// TestProtect_TunnelPeerFastPath_RequiresInboundMarker guards the
|
||||
// anti-spoof gate: a request with an RFC1918 source IP arriving on the
|
||||
// public listener (no TunnelLookupFromContext attached) must not be
|
||||
// allowed to take the tunnel-peer fast-path. Without this gate a public
|
||||
// client whose source IP happens to fall inside an RFC1918 range could
|
||||
// bypass the configured auth scheme by colliding with a known tunnel
|
||||
// IP.
|
||||
func TestProtect_TunnelPeerFastPath_RequiresInboundMarker(t *testing.T) {
|
||||
validator := &stubTunnelValidator{
|
||||
resp: &proto.ValidateTunnelPeerResponse{
|
||||
Valid: true,
|
||||
SessionToken: "should-not-be-used",
|
||||
UserId: "user-1",
|
||||
},
|
||||
}
|
||||
mw := NewMiddleware(log.StandardLogger(), validator, nil)
|
||||
kp := generateTestKeyPair(t)
|
||||
|
||||
scheme := &stubScheme{method: auth.MethodPIN, promptID: "pin"}
|
||||
require.NoError(t, mw.AddDomain("example.com", []Scheme{scheme}, kp.PublicKey, time.Hour, "", "", nil, false))
|
||||
|
||||
handler := mw.Protect(newPassthroughHandler())
|
||||
|
||||
// Request from an RFC1918 source IP on the public listener — no
|
||||
// TunnelLookupFromContext attached. The fast-path must reject this
|
||||
// and fall through to the PIN scheme (which renders 401 on plain
|
||||
// HTTP for a non-authenticated request).
|
||||
req := httptest.NewRequest(http.MethodGet, "http://example.com/", nil)
|
||||
req.RemoteAddr = "100.64.0.5:5000"
|
||||
rec := httptest.NewRecorder()
|
||||
handler.ServeHTTP(rec, req)
|
||||
|
||||
assert.False(t, validator.called,
|
||||
"ValidateTunnelPeer must not be invoked when the request lacks the inbound TunnelLookup marker")
|
||||
assert.Equal(t, http.StatusUnauthorized, rec.Code,
|
||||
"without the inbound marker the request must fall through to the operator auth scheme")
|
||||
}
|
||||
|
||||
// TestProtect_TunnelPeerFastPath_TakesPathWithInboundMarker verifies
|
||||
// the positive side: a request marked as overlay-origin (carrying the
|
||||
// TunnelLookup context value) and matching a tunnel-IP range does take
|
||||
// the fast-path and reach management.
|
||||
func TestProtect_TunnelPeerFastPath_TakesPathWithInboundMarker(t *testing.T) {
|
||||
validator := &stubTunnelValidator{
|
||||
resp: &proto.ValidateTunnelPeerResponse{
|
||||
Valid: true,
|
||||
SessionToken: "tunnel-session-token",
|
||||
UserId: "user-1",
|
||||
},
|
||||
}
|
||||
mw := NewMiddleware(log.StandardLogger(), validator, nil)
|
||||
kp := generateTestKeyPair(t)
|
||||
|
||||
scheme := &stubScheme{method: auth.MethodPIN, promptID: "pin"}
|
||||
require.NoError(t, mw.AddDomain("example.com", []Scheme{scheme}, kp.PublicKey, time.Hour, "", "", nil, false))
|
||||
|
||||
handler := mw.Protect(newPassthroughHandler())
|
||||
|
||||
lookup := TunnelLookupFunc(func(_ netip.Addr) (PeerIdentity, bool) {
|
||||
return PeerIdentity{}, true
|
||||
})
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "http://example.com/", nil)
|
||||
req.RemoteAddr = "100.64.0.5:5000"
|
||||
req = req.WithContext(WithTunnelLookup(req.Context(), lookup))
|
||||
rec := httptest.NewRecorder()
|
||||
handler.ServeHTTP(rec, req)
|
||||
|
||||
assert.True(t, validator.called,
|
||||
"ValidateTunnelPeer must run when the request carries the inbound TunnelLookup marker")
|
||||
assert.Equal(t, http.StatusOK, rec.Code,
|
||||
"a successful tunnel-peer validation must forward to the next handler")
|
||||
}
|
||||
|
||||
@@ -101,10 +101,7 @@ func TestForwardWithTunnelPeer_GroupsPropagateToCapturedData(t *testing.T) {
|
||||
|
||||
w, r := newTunnelRequest("100.64.0.10:55555")
|
||||
cd := proxy.NewCapturedData("")
|
||||
lookup := TunnelLookupFunc(func(_ netip.Addr) (PeerIdentity, bool) {
|
||||
return PeerIdentity{}, true
|
||||
})
|
||||
r = r.WithContext(proxy.WithCapturedData(WithTunnelLookup(r.Context(), lookup), cd))
|
||||
r = r.WithContext(proxy.WithCapturedData(r.Context(), cd))
|
||||
|
||||
called := false
|
||||
next := http.HandlerFunc(func(http.ResponseWriter, *http.Request) { called = true })
|
||||
@@ -151,13 +148,9 @@ func TestForwardWithTunnelPeer_LocalLookupKnownPeerStillRPCs(t *testing.T) {
|
||||
assert.Equal(t, int32(1), validator.tunnelCalls.Load(), "RPC must run for the user-identity tail when local lookup confirms the peer")
|
||||
}
|
||||
|
||||
// TestForwardWithTunnelPeer_NoLookupRefusesFastPath guards the
|
||||
// anti-spoof gate: requests that didn't arrive on the per-account
|
||||
// inbound listener (no TunnelLookup attached) must never reach
|
||||
// management's ValidateTunnelPeer, even when the source IP looks like
|
||||
// a tunnel address. A colliding RFC1918 / CGNAT source on the public
|
||||
// listener would otherwise impersonate a mesh peer.
|
||||
func TestForwardWithTunnelPeer_NoLookupRefusesFastPath(t *testing.T) {
|
||||
// TestForwardWithTunnelPeer_NoLookupKeepsLegacyPath ensures the existing
|
||||
// behaviour stays intact on the host-level listener (no lookup attached).
|
||||
func TestForwardWithTunnelPeer_NoLookupKeepsLegacyPath(t *testing.T) {
|
||||
validator := &stubSessionValidator{
|
||||
respFn: func(_ *proto.ValidateTunnelPeerRequest) *proto.ValidateTunnelPeerResponse {
|
||||
return &proto.ValidateTunnelPeerResponse{Valid: true, SessionToken: "tok", UserId: "user-1"}
|
||||
@@ -172,9 +165,9 @@ func TestForwardWithTunnelPeer_NoLookupRefusesFastPath(t *testing.T) {
|
||||
config, _ := mw.getDomainConfig("svc.example")
|
||||
handled := mw.forwardWithTunnelPeer(w, r, "svc.example", config, next)
|
||||
|
||||
assert.False(t, handled, "fast-path must refuse without the inbound marker")
|
||||
assert.False(t, called, "next handler must not run")
|
||||
assert.Equal(t, int32(0), validator.tunnelCalls.Load(), "ValidateTunnelPeer must not be invoked without the inbound marker")
|
||||
assert.True(t, handled, "host-level path forwards on positive RPC result")
|
||||
assert.True(t, called, "next handler runs on host-level success")
|
||||
assert.Equal(t, int32(1), validator.tunnelCalls.Load(), "host-level path always RPCs (Phase 3 unchanged)")
|
||||
}
|
||||
|
||||
// TestForwardWithTunnelPeer_RPCErrorFallsThrough validates that an RPC
|
||||
@@ -208,13 +201,8 @@ func TestForwardWithTunnelPeer_CacheReusesPositiveResponse(t *testing.T) {
|
||||
}
|
||||
mw := newTunnelMiddleware(t, validator)
|
||||
|
||||
lookup := TunnelLookupFunc(func(_ netip.Addr) (PeerIdentity, bool) {
|
||||
return PeerIdentity{}, true
|
||||
})
|
||||
|
||||
for i := 0; i < 4; i++ {
|
||||
w, r := newTunnelRequest("100.64.0.10:55555")
|
||||
r = r.WithContext(WithTunnelLookup(r.Context(), lookup))
|
||||
next := http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})
|
||||
config, _ := mw.getDomainConfig("svc.example")
|
||||
handled := mw.forwardWithTunnelPeer(w, r, "svc.example", config, next)
|
||||
@@ -238,21 +226,11 @@ func TestForwardWithTunnelPeer_RoutesAccountIDIntoCacheKey(t *testing.T) {
|
||||
require.NoError(t, mw.AddDomain("svc-a.example", nil, "", 0, "acct-a", "svc-a", nil, false))
|
||||
require.NoError(t, mw.AddDomain("svc-b.example", nil, "", 0, "acct-b", "svc-b", nil, false))
|
||||
|
||||
// The fast-path requires the inbound-listener marker on the context.
|
||||
// The peerstore lookup itself is account-agnostic at this level
|
||||
// (one TunnelLookupFunc per account is attached by inbound.go); a
|
||||
// trivial "always hit" lookup is enough to exercise the cache-key
|
||||
// branch this test covers.
|
||||
lookup := TunnelLookupFunc(func(_ netip.Addr) (PeerIdentity, bool) {
|
||||
return PeerIdentity{}, true
|
||||
})
|
||||
|
||||
for _, host := range []string{"svc-a.example", "svc-b.example"} {
|
||||
w := httptest.NewRecorder()
|
||||
r := httptest.NewRequest(http.MethodGet, "https://"+host+"/", nil)
|
||||
r.Host = host
|
||||
r.RemoteAddr = "100.64.0.10:55555"
|
||||
r = r.WithContext(WithTunnelLookup(r.Context(), lookup))
|
||||
config, _ := mw.getDomainConfig(host)
|
||||
handled := mw.forwardWithTunnelPeer(w, r, host, config, http.HandlerFunc(func(http.ResponseWriter, *http.Request) {}))
|
||||
require.True(t, handled, "host %s should forward", host)
|
||||
@@ -336,17 +314,9 @@ func TestPrivateService_ForwardsOnTunnelPeerSuccess(t *testing.T) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
|
||||
// Per-account inbound listener attaches WithTunnelLookup; without it
|
||||
// forwardWithTunnelPeer refuses to take the fast-path. Mirror the
|
||||
// real flow so this test exercises the post-gating success branch.
|
||||
lookup := TunnelLookupFunc(func(_ netip.Addr) (PeerIdentity, bool) {
|
||||
return PeerIdentity{}, true
|
||||
})
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "https://private.svc/", nil)
|
||||
req.Host = "private.svc"
|
||||
req.RemoteAddr = "100.64.0.10:55555"
|
||||
req = req.WithContext(WithTunnelLookup(req.Context(), lookup))
|
||||
w := httptest.NewRecorder()
|
||||
handler.ServeHTTP(w, req)
|
||||
|
||||
|
||||
@@ -131,7 +131,7 @@ func (h *Handler) SetCertStatus(cs certStatus) {
|
||||
|
||||
// SetInboundProvider wires per-account inbound listener observability.
|
||||
// Pass nil (or skip the call) to keep the inbound section out of debug
|
||||
// responses on proxies that don't run --private.
|
||||
// responses on proxies that don't run --private-inbound.
|
||||
func (h *Handler) SetInboundProvider(p InboundProvider) {
|
||||
h.inbound = p
|
||||
}
|
||||
|
||||
@@ -66,22 +66,6 @@ func (p *ReverseProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Loop guard for private services: a peer that hosts the target
|
||||
// dialing its own service URL would round-trip its own traffic
|
||||
// through the proxy and back over WG to itself. Refuse the request
|
||||
// with 421 (Misdirected Request) so the caller sees an explicit
|
||||
// error instead of silently doubling tunnel traffic.
|
||||
if p.isSelfTargetLoop(r, result.target.URL) {
|
||||
if cd := CapturedDataFromContext(r.Context()); cd != nil {
|
||||
cd.SetOrigin(OriginNoRoute)
|
||||
}
|
||||
requestID := getRequestID(r)
|
||||
web.ServeErrorPage(w, r, http.StatusMisdirectedRequest, "Loop Detected",
|
||||
"This peer is the target of the requested service. Reach the backend directly instead of dialing the public service URL from the same machine.",
|
||||
requestID, web.ErrorStatus{Proxy: true, Destination: false})
|
||||
return
|
||||
}
|
||||
|
||||
ctx := r.Context()
|
||||
// Set the account ID in the context for the roundtripper to use.
|
||||
ctx = roundtrip.WithAccountID(ctx, result.accountID)
|
||||
@@ -123,32 +107,6 @@ func (p *ReverseProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
rp.ServeHTTP(w, r.WithContext(ctx))
|
||||
}
|
||||
|
||||
// isSelfTargetLoop reports whether an overlay-origin request is about to
|
||||
// be forwarded back to the very peer that initiated it. The detection
|
||||
// is intentionally narrow: it only fires when the request arrived on
|
||||
// the per-account inbound (overlay) listener (so we're confident the
|
||||
// source address is the caller's tunnel IP), and only when the resolved
|
||||
// target host matches that tunnel IP. Catching this here returns 421 to
|
||||
// the caller instead of letting the proxy round-trip its own traffic
|
||||
// over WG twice.
|
||||
func (p *ReverseProxy) isSelfTargetLoop(r *http.Request, target *url.URL) bool {
|
||||
if target == nil {
|
||||
return false
|
||||
}
|
||||
if !types.IsOverlayOrigin(r.Context()) {
|
||||
return false
|
||||
}
|
||||
srcIP := extractHostIP(r.RemoteAddr)
|
||||
if !srcIP.IsValid() {
|
||||
return false
|
||||
}
|
||||
targetIP, err := netip.ParseAddr(target.Hostname())
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return srcIP.Unmap() == targetIP.Unmap()
|
||||
}
|
||||
|
||||
// rewriteFunc returns a Rewrite function for httputil.ReverseProxy that rewrites
|
||||
// inbound requests to target the backend service while setting security-relevant
|
||||
// forwarding headers and stripping proxy authentication credentials.
|
||||
|
||||
@@ -20,7 +20,6 @@ import (
|
||||
|
||||
"github.com/netbirdio/netbird/proxy/auth"
|
||||
"github.com/netbirdio/netbird/proxy/internal/roundtrip"
|
||||
"github.com/netbirdio/netbird/proxy/internal/types"
|
||||
"github.com/netbirdio/netbird/proxy/web"
|
||||
)
|
||||
|
||||
@@ -1286,103 +1285,6 @@ func TestStampNetBirdIdentity_OmitsGroupsHeaderWhenAllInvalid(t *testing.T) {
|
||||
"X-NetBird-Groups must not be set when every group label is rejected")
|
||||
}
|
||||
|
||||
// nopOKTransport returns 200 for every request without dialing — used
|
||||
// by the self-target-loop tests so the non-loop cases don't pay a real
|
||||
// TCP-dial timeout.
|
||||
type nopOKTransport struct{}
|
||||
|
||||
func (nopOKTransport) RoundTrip(*http.Request) (*http.Response, error) {
|
||||
return &http.Response{StatusCode: http.StatusOK, Body: http.NoBody, Header: http.Header{}}, nil
|
||||
}
|
||||
|
||||
// TestServeHTTP_SelfTargetLoopReturns421 covers the loop guard for
|
||||
// private services: when a peer dials a service whose only target is
|
||||
// the peer itself, the proxy must refuse with 421 (Misdirected
|
||||
// Request) rather than round-tripping the request back over WG to
|
||||
// the same peer.
|
||||
func TestServeHTTP_SelfTargetLoopReturns421(t *testing.T) {
|
||||
rp := NewReverseProxy(nopOKTransport{}, "auto", nil, nil)
|
||||
rp.AddMapping(Mapping{
|
||||
ID: "svc-1",
|
||||
AccountID: "acct-1",
|
||||
Host: "private.svc",
|
||||
Paths: map[string]*PathTarget{
|
||||
"/": {
|
||||
URL: &url.URL{Scheme: "http", Host: "100.64.0.5:8080"},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "http://private.svc/", nil)
|
||||
req.Host = "private.svc"
|
||||
req.RemoteAddr = "100.64.0.5:55555"
|
||||
req = req.WithContext(types.WithOverlayOrigin(req.Context()))
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
rp.ServeHTTP(rec, req)
|
||||
|
||||
assert.Equal(t, http.StatusMisdirectedRequest, rec.Code,
|
||||
"a peer dialing a service whose target is itself must get 421")
|
||||
}
|
||||
|
||||
// TestServeHTTP_SelfTargetLoop_NonOverlayRequestPassesThrough verifies
|
||||
// the guard is scoped to overlay-origin requests. A public-listener
|
||||
// request that happens to share a source IP with the target host must
|
||||
// not be misinterpreted as a loop — the gating relies on the inbound
|
||||
// marker being attached only by the per-account overlay listener.
|
||||
func TestServeHTTP_SelfTargetLoop_NonOverlayRequestPassesThrough(t *testing.T) {
|
||||
rp := NewReverseProxy(nopOKTransport{}, "auto", nil, nil)
|
||||
rp.AddMapping(Mapping{
|
||||
ID: "svc-1",
|
||||
AccountID: "acct-1",
|
||||
Host: "public.svc",
|
||||
Paths: map[string]*PathTarget{
|
||||
"/": {
|
||||
URL: &url.URL{Scheme: "http", Host: "100.64.0.5:8080"},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "http://public.svc/", nil)
|
||||
req.Host = "public.svc"
|
||||
req.RemoteAddr = "100.64.0.5:55555"
|
||||
// No WithOverlayOrigin → the guard must not fire.
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
rp.ServeHTTP(rec, req)
|
||||
|
||||
assert.NotEqual(t, http.StatusMisdirectedRequest, rec.Code,
|
||||
"a non-overlay request with a colliding source IP must not be flagged as a loop")
|
||||
}
|
||||
|
||||
// TestServeHTTP_SelfTargetLoop_OverlayDifferentIPPassesThrough confirms
|
||||
// that overlay-origin requests with a source IP that does *not* match
|
||||
// the target host are forwarded normally.
|
||||
func TestServeHTTP_SelfTargetLoop_OverlayDifferentIPPassesThrough(t *testing.T) {
|
||||
rp := NewReverseProxy(nopOKTransport{}, "auto", nil, nil)
|
||||
rp.AddMapping(Mapping{
|
||||
ID: "svc-1",
|
||||
AccountID: "acct-1",
|
||||
Host: "private.svc",
|
||||
Paths: map[string]*PathTarget{
|
||||
"/": {
|
||||
URL: &url.URL{Scheme: "http", Host: "100.64.0.5:8080"},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "http://private.svc/", nil)
|
||||
req.Host = "private.svc"
|
||||
req.RemoteAddr = "100.64.0.99:55555" // different from the target
|
||||
req = req.WithContext(types.WithOverlayOrigin(req.Context()))
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
rp.ServeHTTP(rec, req)
|
||||
|
||||
assert.NotEqual(t, http.StatusMisdirectedRequest, rec.Code,
|
||||
"overlay request with a non-matching source IP must not be flagged as a loop")
|
||||
}
|
||||
|
||||
// TestStampNetBirdIdentity_CapturedDataPresentButEmpty covers requests
|
||||
// that carry CapturedData with no identity fields populated (e.g. the
|
||||
// auth middleware ran but the request didn't authenticate). Both
|
||||
|
||||
@@ -213,11 +213,7 @@ func (n *NetBird) AddPeer(ctx context.Context, accountID types.AccountID, key Se
|
||||
}).Debug("registered service with existing client")
|
||||
|
||||
if started && n.statusNotifier != nil {
|
||||
// Use a background context, not the caller's: the management
|
||||
// connection notification must land even if the request /
|
||||
// stream that triggered this registration is cancelled.
|
||||
// Mirrors the async runClientStartup path.
|
||||
if err := n.statusNotifier.NotifyStatus(context.Background(), accountID, serviceID, true); err != nil {
|
||||
if err := n.statusNotifier.NotifyStatus(ctx, accountID, serviceID, true); err != nil {
|
||||
n.logger.WithFields(log.Fields{
|
||||
"account_id": accountID,
|
||||
"service_key": key,
|
||||
@@ -246,10 +242,8 @@ func (n *NetBird) AddPeer(ctx context.Context, accountID types.AccountID, key Se
|
||||
}).Info("created new client for account")
|
||||
|
||||
// Attempt to start the client in the background; if this fails we will
|
||||
// retry on the first request via RoundTrip. runClientStartup uses its
|
||||
// own background context so the caller's request-scoped ctx can't
|
||||
// cancel the inbound bring-up.
|
||||
go n.runClientStartup(accountID, entry.client)
|
||||
// retry on the first request via RoundTrip.
|
||||
go n.runClientStartup(ctx, accountID, entry.client)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -361,14 +355,8 @@ func (n *NetBird) createClientEntry(ctx context.Context, accountID types.Account
|
||||
}, nil
|
||||
}
|
||||
|
||||
// runClientStartup starts the client and notifies registered services on
|
||||
// success. This function runs in a goroutine launched from AddPeer, so it
|
||||
// must never inherit the caller's request-scoped context — a canceled
|
||||
// request must not abort the inbound listener bring-up or the management
|
||||
// status notification. The embedded client.Start gets its own bounded
|
||||
// startCtx; once Start succeeds, notifyClientReady takes over with a
|
||||
// fresh context.Background() (see that function for the contract).
|
||||
func (n *NetBird) runClientStartup(accountID types.AccountID, client *embed.Client) {
|
||||
// runClientStartup starts the client and notifies registered services on success.
|
||||
func (n *NetBird) runClientStartup(ctx context.Context, accountID types.AccountID, client *embed.Client) {
|
||||
startCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -381,17 +369,7 @@ func (n *NetBird) runClientStartup(accountID types.AccountID, client *embed.Clie
|
||||
return
|
||||
}
|
||||
|
||||
n.notifyClientReady(accountID, client)
|
||||
}
|
||||
|
||||
// notifyClientReady marks the account's client as started, fires the
|
||||
// readyHandler hook, and notifies management of the new tunnel
|
||||
// connection for every registered service. It is split out of
|
||||
// runClientStartup so a regression test can drive the post-Start tail
|
||||
// without needing a live embedded client. The contract that the
|
||||
// hooks/notifier see context.Background() — never the AddPeer caller's
|
||||
// ctx — lives here.
|
||||
func (n *NetBird) notifyClientReady(accountID types.AccountID, client *embed.Client) {
|
||||
// Mark client as started and collect services to notify outside the lock.
|
||||
n.clientsMux.Lock()
|
||||
entry, exists := n.clients[accountID]
|
||||
if exists {
|
||||
@@ -406,10 +384,8 @@ func (n *NetBird) notifyClientReady(accountID types.AccountID, client *embed.Cli
|
||||
readyHandler := n.readyHandler
|
||||
n.clientsMux.Unlock()
|
||||
|
||||
bgCtx := context.Background()
|
||||
|
||||
if readyHandler != nil {
|
||||
state := readyHandler(bgCtx, accountID, client)
|
||||
state := readyHandler(ctx, accountID, client)
|
||||
n.clientsMux.Lock()
|
||||
if e, ok := n.clients[accountID]; ok {
|
||||
e.inbound = state
|
||||
@@ -428,7 +404,7 @@ func (n *NetBird) notifyClientReady(accountID types.AccountID, client *embed.Cli
|
||||
return
|
||||
}
|
||||
for _, sn := range toNotify {
|
||||
if err := n.statusNotifier.NotifyStatus(bgCtx, accountID, sn.serviceID, true); err != nil {
|
||||
if err := n.statusNotifier.NotifyStatus(ctx, accountID, sn.serviceID, true); err != nil {
|
||||
n.logger.WithFields(log.Fields{
|
||||
"account_id": accountID,
|
||||
"service_key": sn.key,
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/netbirdio/netbird/client/embed"
|
||||
"github.com/netbirdio/netbird/proxy/internal/types"
|
||||
"github.com/netbirdio/netbird/shared/management/proto"
|
||||
)
|
||||
@@ -31,15 +30,12 @@ type statusCall struct {
|
||||
accountID types.AccountID
|
||||
serviceID types.ServiceID
|
||||
connected bool
|
||||
// ctx is captured so tests can assert the notifier received a
|
||||
// fresh background context rather than an inherited request ctx.
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (m *mockStatusNotifier) NotifyStatus(ctx context.Context, accountID types.AccountID, serviceID types.ServiceID, connected bool) error {
|
||||
func (m *mockStatusNotifier) NotifyStatus(_ context.Context, accountID types.AccountID, serviceID types.ServiceID, connected bool) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.statuses = append(m.statuses, statusCall{accountID, serviceID, connected, ctx})
|
||||
m.statuses = append(m.statuses, statusCall{accountID, serviceID, connected})
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -299,12 +295,8 @@ func TestNetBird_AddPeer_ExistingStartedClient_NotifiesStatus(t *testing.T) {
|
||||
nb.clients[accountID].started = true
|
||||
nb.clientsMux.Unlock()
|
||||
|
||||
// Add second service with an already-cancelled caller context —
|
||||
// should notify immediately (client is started) AND the notification
|
||||
// must not inherit the cancelled ctx.
|
||||
cancelledCtx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
err = nb.AddPeer(cancelledCtx, accountID, "domain2.test", "key-1", types.ServiceID("svc-2"))
|
||||
// Add second service — should notify immediately since client is already started.
|
||||
err = nb.AddPeer(context.Background(), accountID, "domain2.test", "key-1", types.ServiceID("svc-2"))
|
||||
require.NoError(t, err)
|
||||
|
||||
calls := notifier.calls()
|
||||
@@ -312,9 +304,6 @@ func TestNetBird_AddPeer_ExistingStartedClient_NotifiesStatus(t *testing.T) {
|
||||
assert.Equal(t, accountID, calls[0].accountID)
|
||||
assert.Equal(t, types.ServiceID("svc-2"), calls[0].serviceID)
|
||||
assert.True(t, calls[0].connected)
|
||||
require.NotNil(t, calls[0].ctx, "NotifyStatus must receive a context")
|
||||
require.NoError(t, calls[0].ctx.Err(),
|
||||
"already-started NotifyStatus must use a background ctx, not the cancelled caller ctx")
|
||||
}
|
||||
|
||||
// TestNetBird_IdentityForIP_UnknownAccountReturnsFalse confirms that the
|
||||
@@ -371,53 +360,3 @@ func TestNetBird_RemovePeer_NotifiesDisconnection(t *testing.T) {
|
||||
assert.Equal(t, types.ServiceID("svc-1"), calls[0].serviceID)
|
||||
assert.False(t, calls[0].connected)
|
||||
}
|
||||
|
||||
// TestNotifyClientReady_UsesBackgroundCtx pins the contract that the
|
||||
// post-Start hooks (readyHandler + statusNotifier.NotifyStatus) run on
|
||||
// a fresh context.Background() rather than inheriting the AddPeer
|
||||
// caller's request- or stream-scoped ctx. Without this, a cancelled
|
||||
// caller ctx could abort the inbound listener bring-up or cause the
|
||||
// management status notification to fail spuriously and leave the
|
||||
// account in a half-connected state.
|
||||
func TestNotifyClientReady_UsesBackgroundCtx(t *testing.T) {
|
||||
notifier := &mockStatusNotifier{}
|
||||
nb := NewNetBird("test-proxy", "invalid.test", ClientConfig{
|
||||
MgmtAddr: "http://invalid.test:9999",
|
||||
}, nil, notifier, &mockMgmtClient{})
|
||||
|
||||
accountID := types.AccountID("acct-async")
|
||||
// Pre-populate a client entry so notifyClientReady has something
|
||||
// to mark started + something to enumerate for NotifyStatus.
|
||||
nb.clientsMux.Lock()
|
||||
nb.clients[accountID] = &clientEntry{
|
||||
services: map[ServiceKey]serviceInfo{
|
||||
DomainServiceKey("svc.example"): {serviceID: types.ServiceID("svc-1")},
|
||||
},
|
||||
}
|
||||
nb.clientsMux.Unlock()
|
||||
|
||||
var capturedReadyCtx context.Context
|
||||
nb.SetClientLifecycle(
|
||||
func(ctx context.Context, _ types.AccountID, _ *embed.Client) any {
|
||||
capturedReadyCtx = ctx
|
||||
return nil
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
// Drive the post-Start path directly; a real client.Start would
|
||||
// need a working management URL.
|
||||
nb.notifyClientReady(accountID, nil)
|
||||
|
||||
require.NotNil(t, capturedReadyCtx, "readyHandler must have been invoked")
|
||||
require.NoError(t, capturedReadyCtx.Err(),
|
||||
"readyHandler must receive a background context, not an inherited cancelled one")
|
||||
deadline, ok := capturedReadyCtx.Deadline()
|
||||
assert.False(t, ok, "readyHandler ctx must have no deadline (background); got %v", deadline)
|
||||
|
||||
calls := notifier.calls()
|
||||
require.Len(t, calls, 1, "NotifyStatus must be invoked once per registered service")
|
||||
require.NotNil(t, calls[0].ctx, "NotifyStatus must receive a context")
|
||||
require.NoError(t, calls[0].ctx.Err(),
|
||||
"NotifyStatus must receive a background context, not an inherited cancelled one")
|
||||
}
|
||||
|
||||
@@ -1781,14 +1781,11 @@ func TestRouter_PlainHTTP_RoutesToPlainChannel(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
tlsListener, ok := router.HTTPListener().(*chanListener)
|
||||
require.True(t, ok, "router.HTTPListener() must be the test's chanListener; the test relies on observing its channel directly")
|
||||
|
||||
select {
|
||||
case conn := <-acceptDone:
|
||||
require.NotNil(t, conn)
|
||||
_ = conn.Close()
|
||||
case <-tlsListener.ch:
|
||||
case <-router.HTTPListener().(*chanListener).ch:
|
||||
t.Fatal("plain HTTP request leaked into TLS channel")
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("plain HTTP connection never reached plain channel")
|
||||
|
||||
@@ -20,17 +20,14 @@ import (
|
||||
type Config struct {
|
||||
// ListenAddr is the TCP address the main listener binds. Required.
|
||||
ListenAddr string
|
||||
// ID identifies this proxy instance to management. Empty values are
|
||||
// replaced with a timestamped default at Server.Start time (see
|
||||
// initDefaults), not in New.
|
||||
// ID identifies this proxy instance to management. Empty value lets
|
||||
// New generate a timestamped default.
|
||||
ID string
|
||||
// Logger is the logrus logger used everywhere. Empty values fall
|
||||
// back to log.StandardLogger() at Server.Start time (see
|
||||
// initDefaults), not in New.
|
||||
// Logger is the logrus logger used everywhere. Empty value falls back
|
||||
// to log.StandardLogger().
|
||||
Logger *log.Logger
|
||||
// Version is the build version string reported to management. Empty
|
||||
// values are replaced with "dev" at Server.Start time (see
|
||||
// initDefaults), not in New.
|
||||
// becomes "dev".
|
||||
Version string
|
||||
// ProxyURL is the public address operators use to reach this proxy.
|
||||
ProxyURL string
|
||||
|
||||
@@ -281,7 +281,7 @@ func (s *Server) NotifyCertificateIssued(ctx context.Context, accountID types.Ac
|
||||
}
|
||||
|
||||
// inboundListenerProto resolves the per-account inbound listener state for
|
||||
// the SendStatusUpdate payload. Returns nil when --private is off
|
||||
// the SendStatusUpdate payload. Returns nil when --private-inbound is off
|
||||
// or the account has no live listener so management treats the field as
|
||||
// absent.
|
||||
func (s *Server) inboundListenerProto(accountID types.AccountID) *proto.ProxyInboundListener {
|
||||
@@ -528,7 +528,7 @@ func (s *Server) initManagementClient() error {
|
||||
}
|
||||
|
||||
// initNetBirdClient builds the multi-tenant embedded NetBird client used
|
||||
// for outbound RoundTripping and (when --private is on) per-account
|
||||
// for outbound RoundTripping and (when --private-inbound is on) per-account
|
||||
// inbound listeners.
|
||||
func (s *Server) initNetBirdClient() {
|
||||
s.netbird = roundtrip.NewNetBird(s.ID, s.ProxyURL, roundtrip.ClientConfig{
|
||||
|
||||
@@ -2,7 +2,6 @@ package rest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/url"
|
||||
|
||||
"github.com/netbirdio/netbird/shared/management/http/api"
|
||||
@@ -34,12 +33,6 @@ func (a *ReverseProxyClustersAPI) List(ctx context.Context) ([]api.ProxyCluster,
|
||||
// NetBird cannot be deleted via this endpoint; the server returns 404 / 400
|
||||
// for cluster addresses the account does not own.
|
||||
func (a *ReverseProxyClustersAPI) Delete(ctx context.Context, clusterAddress string) error {
|
||||
// Guard against the empty input: url.PathEscape("") returns "" which
|
||||
// would collapse the request URL onto the collection endpoint and
|
||||
// silently delete nothing (or 405 depending on routing).
|
||||
if clusterAddress == "" {
|
||||
return errors.New("clusterAddress is required")
|
||||
}
|
||||
resp, err := a.c.NewRequest(ctx, "DELETE", "/api/reverse-proxies/clusters/"+url.PathEscape(clusterAddress), nil, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -88,17 +88,3 @@ func TestReverseProxyClusters_Delete_Err(t *testing.T) {
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
// TestReverseProxyClusters_Delete_EmptyAddress guards against an empty
|
||||
// clusterAddress reaching the wire — that would collapse the URL onto
|
||||
// the collection endpoint instead of a specific cluster. The client
|
||||
// must short-circuit with a typed error before any request is issued.
|
||||
func TestReverseProxyClusters_Delete_EmptyAddress(t *testing.T) {
|
||||
withMockClient(func(c *rest.Client, mux *http.ServeMux) {
|
||||
mux.HandleFunc("/api/reverse-proxies/clusters/", func(http.ResponseWriter, *http.Request) {
|
||||
t.Fatal("empty clusterAddress must be rejected client-side; no request should reach the server")
|
||||
})
|
||||
err := c.ReverseProxyClusters.Delete(context.Background(), "")
|
||||
assert.Error(t, err, "empty clusterAddress must surface as an error")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/url"
|
||||
|
||||
"github.com/netbirdio/netbird/shared/management/http/api"
|
||||
@@ -62,12 +61,6 @@ func (a *ReverseProxyTokensAPI) Create(ctx context.Context, request api.ProxyTok
|
||||
// credentials existed; the plain secret can no longer authenticate any
|
||||
// new proxy registration.
|
||||
func (a *ReverseProxyTokensAPI) Delete(ctx context.Context, tokenID string) error {
|
||||
// Guard against the empty input: url.PathEscape("") returns "" which
|
||||
// would collapse the request URL onto the collection endpoint and
|
||||
// silently delete nothing (or 405 depending on routing).
|
||||
if tokenID == "" {
|
||||
return errors.New("tokenID is required")
|
||||
}
|
||||
resp, err := a.c.NewRequest(ctx, "DELETE", "/api/reverse-proxies/proxy-tokens/"+url.PathEscape(tokenID), nil, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -129,16 +129,3 @@ func TestReverseProxyTokens_Delete_Err(t *testing.T) {
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
// TestReverseProxyTokens_Delete_EmptyID guards against an empty tokenID
|
||||
// reaching the wire — url.PathEscape("") would collapse the URL onto
|
||||
// the collection endpoint.
|
||||
func TestReverseProxyTokens_Delete_EmptyID(t *testing.T) {
|
||||
withMockClient(func(c *rest.Client, mux *http.ServeMux) {
|
||||
mux.HandleFunc("/api/reverse-proxies/proxy-tokens/", func(http.ResponseWriter, *http.Request) {
|
||||
t.Fatal("empty tokenID must be rejected client-side; no request should reach the server")
|
||||
})
|
||||
err := c.ReverseProxyTokens.Delete(context.Background(), "")
|
||||
assert.Error(t, err, "empty tokenID must surface as an error")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3086,24 +3086,6 @@ components:
|
||||
- enabled
|
||||
- auth
|
||||
- meta
|
||||
allOf:
|
||||
# When private=true, access_groups must be present and non-empty,
|
||||
# and the service mode must be "http". The bearer-auth mutex is
|
||||
# enforced at the service-validation layer
|
||||
# (validatePrivateRequirements) because it sits in a nested
|
||||
# ServiceAuthConfig and isn't cleanly expressible here.
|
||||
- if:
|
||||
required: [private]
|
||||
properties:
|
||||
private:
|
||||
const: true
|
||||
then:
|
||||
required: [access_groups]
|
||||
properties:
|
||||
access_groups:
|
||||
minItems: 1
|
||||
mode:
|
||||
const: http
|
||||
ServiceMeta:
|
||||
type: object
|
||||
properties:
|
||||
@@ -3191,23 +3173,6 @@ components:
|
||||
- name
|
||||
- domain
|
||||
- enabled
|
||||
allOf:
|
||||
# Mirror of the Service conditional: when private=true the
|
||||
# request must carry a non-empty access_groups list and the
|
||||
# mode must be "http". The bearer-auth mutex is enforced at the
|
||||
# service-validation layer (validatePrivateRequirements).
|
||||
- if:
|
||||
required: [private]
|
||||
properties:
|
||||
private:
|
||||
const: true
|
||||
then:
|
||||
required: [access_groups]
|
||||
properties:
|
||||
access_groups:
|
||||
minItems: 1
|
||||
mode:
|
||||
const: http
|
||||
ServiceTargetOptions:
|
||||
type: object
|
||||
properties:
|
||||
|
||||
@@ -237,7 +237,7 @@ message SendStatusUpdateRequest {
|
||||
bool certificate_issued = 4;
|
||||
optional string error_message = 5;
|
||||
// Per-account inbound listener state for the account that owns
|
||||
// service_id. Populated only when --private is enabled and the
|
||||
// service_id. Populated only when --private-inbound is enabled and the
|
||||
// embedded client for the account is up. Field numbers >=50 reserved
|
||||
// for observability extensions.
|
||||
optional ProxyInboundListener inbound_listener = 50;
|
||||
|
||||
Reference in New Issue
Block a user