mirror of
https://github.com/netbirdio/netbird.git
synced 2026-07-02 04:39:55 +00:00
Compare commits
5 Commits
feature/ad
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d4736de55 | ||
|
|
06839a4731 | ||
|
|
eb422a5cd3 | ||
|
|
0aa0f7c76b | ||
|
|
7c0d8cbae0 |
9
.github/workflows/agent-network-e2e.yml
vendored
9
.github/workflows/agent-network-e2e.yml
vendored
@@ -1,10 +1,10 @@
|
||||
name: Agent Network E2E
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
pull_request:
|
||||
# Nightly at 03:00 UTC, plus on demand from the Actions tab.
|
||||
schedule:
|
||||
- cron: "0 3 * * *"
|
||||
workflow_dispatch:
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.ref }}
|
||||
@@ -13,7 +13,6 @@ concurrency:
|
||||
jobs:
|
||||
e2e:
|
||||
name: Agent Network E2E
|
||||
if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 45
|
||||
steps:
|
||||
|
||||
@@ -803,15 +803,17 @@ func (conn *Conn) isConnectedOnAllWay() (status guard.ConnStatus) {
|
||||
}
|
||||
|
||||
func (conn *Conn) enableWgWatcherIfNeeded(enabledTime time.Time) {
|
||||
if !conn.wgWatcher.IsEnabled() {
|
||||
wgWatcherCtx, wgWatcherCancel := context.WithCancel(conn.ctx)
|
||||
conn.wgWatcherCancel = wgWatcherCancel
|
||||
conn.wgWatcherWg.Add(1)
|
||||
go func() {
|
||||
defer conn.wgWatcherWg.Done()
|
||||
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, enabledTime, conn.onWGDisconnected, conn.onWGHandshakeSuccess)
|
||||
}()
|
||||
if !conn.wgWatcher.PrepareInitialHandshake() {
|
||||
return
|
||||
}
|
||||
|
||||
wgWatcherCtx, wgWatcherCancel := context.WithCancel(conn.ctx)
|
||||
conn.wgWatcherCancel = wgWatcherCancel
|
||||
conn.wgWatcherWg.Add(1)
|
||||
go func() {
|
||||
defer conn.wgWatcherWg.Done()
|
||||
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, enabledTime, conn.onWGDisconnected, conn.onWGHandshakeSuccess)
|
||||
}()
|
||||
}
|
||||
|
||||
func (conn *Conn) disableWgWatcherIfNeeded() {
|
||||
|
||||
@@ -31,7 +31,9 @@ type WGWatcher struct {
|
||||
stateDump *stateDump
|
||||
|
||||
enabled bool
|
||||
muEnabled sync.RWMutex
|
||||
muEnabled sync.Mutex
|
||||
// initialHandshake is not thread-safe; never call PrepareInitialHandshake and EnableWgWatcher concurrently.
|
||||
initialHandshake time.Time
|
||||
|
||||
resetCh chan struct{}
|
||||
}
|
||||
@@ -46,38 +48,38 @@ func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey strin
|
||||
}
|
||||
}
|
||||
|
||||
// EnableWgWatcher starts the WireGuard watcher. If it is already enabled, it will return immediately and do nothing.
|
||||
// The watcher runs until ctx is cancelled. Caller is responsible for context lifecycle management.
|
||||
func (w *WGWatcher) EnableWgWatcher(ctx context.Context, enabledTime time.Time, onDisconnectedFn func(), onHandshakeSuccessFn func(when time.Time)) {
|
||||
// PrepareInitialHandshake reserves the watcher and reads the peer's current WireGuard
|
||||
// handshake time. It must be called before the peer is (re)configured on the WireGuard
|
||||
// interface, so the captured baseline reflects the state prior to this connection attempt
|
||||
// instead of racing with that configuration. Returns ok=false if the watcher is already
|
||||
// running, in which case EnableWgWatcher must not be called.
|
||||
func (w *WGWatcher) PrepareInitialHandshake() (ok bool) {
|
||||
w.muEnabled.Lock()
|
||||
if w.enabled {
|
||||
w.muEnabled.Unlock()
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
w.log.Debugf("enable WireGuard watcher")
|
||||
w.enabled = true
|
||||
w.muEnabled.Unlock()
|
||||
|
||||
initialHandshake, err := w.wgState()
|
||||
if err != nil {
|
||||
w.log.Warnf("failed to read initial wg stats: %v", err)
|
||||
}
|
||||
handshake, _ := w.wgState()
|
||||
w.initialHandshake = handshake
|
||||
return true
|
||||
}
|
||||
|
||||
w.periodicHandshakeCheck(ctx, onDisconnectedFn, onHandshakeSuccessFn, enabledTime, initialHandshake)
|
||||
// EnableWgWatcher runs the WireGuard watcher loop using the handshake baseline captured by
|
||||
// PrepareInitialHandshake. The watcher runs until ctx is cancelled. Caller is responsible
|
||||
// for context lifecycle management.
|
||||
func (w *WGWatcher) EnableWgWatcher(ctx context.Context, enabledTime time.Time, onDisconnectedFn func(), onHandshakeSuccessFn func(when time.Time)) {
|
||||
w.periodicHandshakeCheck(ctx, onDisconnectedFn, onHandshakeSuccessFn, enabledTime, w.initialHandshake)
|
||||
|
||||
w.muEnabled.Lock()
|
||||
w.enabled = false
|
||||
w.muEnabled.Unlock()
|
||||
}
|
||||
|
||||
// IsEnabled returns true if the WireGuard watcher is currently enabled
|
||||
func (w *WGWatcher) IsEnabled() bool {
|
||||
w.muEnabled.RLock()
|
||||
defer w.muEnabled.RUnlock()
|
||||
return w.enabled
|
||||
}
|
||||
|
||||
// Reset signals the watcher that the WireGuard peer has been reset and a new
|
||||
// handshake is expected. This restarts the handshake timeout from scratch.
|
||||
func (w *WGWatcher) Reset() {
|
||||
@@ -101,13 +103,16 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn
|
||||
case <-timer.C:
|
||||
handshake, ok := w.handshakeCheck(lastHandshake)
|
||||
if !ok {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
onDisconnectedFn()
|
||||
return
|
||||
}
|
||||
if lastHandshake.IsZero() {
|
||||
elapsed := calcElapsed(enabledTime, *handshake)
|
||||
w.log.Infof("first wg handshake detected within: %.2fsec, (%s)", elapsed, handshake)
|
||||
if onHandshakeSuccessFn != nil {
|
||||
if onHandshakeSuccessFn != nil && ctx.Err() == nil {
|
||||
onHandshakeSuccessFn(*handshake)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||
)
|
||||
@@ -34,6 +35,9 @@ func TestWGWatcher_EnableWgWatcher(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
ok := watcher.PrepareInitialHandshake()
|
||||
require.True(t, ok, "watcher should not be enabled yet")
|
||||
|
||||
onDisconnected := make(chan struct{}, 1)
|
||||
go watcher.EnableWgWatcher(ctx, time.Now(), func() {
|
||||
mlog.Infof("onDisconnectedFn")
|
||||
@@ -62,6 +66,9 @@ func TestWGWatcher_ReEnable(t *testing.T) {
|
||||
watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump("peer", mlog, &Status{}))
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ok := watcher.PrepareInitialHandshake()
|
||||
require.True(t, ok, "watcher should not be enabled yet")
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
@@ -76,6 +83,9 @@ func TestWGWatcher_ReEnable(t *testing.T) {
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
ok = watcher.PrepareInitialHandshake()
|
||||
require.True(t, ok, "watcher should be re-enabled after the previous run stopped")
|
||||
|
||||
onDisconnected := make(chan struct{}, 1)
|
||||
go watcher.EnableWgWatcher(ctx, time.Now(), func() {
|
||||
onDisconnected <- struct{}{}
|
||||
|
||||
140
e2e/agentnetwork/skiptls_test.go
Normal file
140
e2e/agentnetwork/skiptls_test.go
Normal file
@@ -0,0 +1,140 @@
|
||||
//go:build e2e
|
||||
|
||||
package agentnetwork
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/netbirdio/netbird/e2e/harness"
|
||||
"github.com/netbirdio/netbird/shared/management/http/api"
|
||||
)
|
||||
|
||||
// TestProviderSkipTLSVerification proves skip_tls_verification is per-provider:
|
||||
// two providers share one self-signed upstream, one skipping TLS verification
|
||||
// and one not. The skip=true provider's chat reaches the upstream and returns
|
||||
// 200; the skip=false provider's chat fails at the TLS handshake — same
|
||||
// upstream, opposite outcome. This is the behaviour a target-level flag could
|
||||
// not give, since all of an account's providers share one synthesised target.
|
||||
func TestProviderSkipTLSVerification(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
up, err := harness.StartFakeUpstream(ctx, srv)
|
||||
require.NoError(t, err, "start self-signed upstream")
|
||||
t.Cleanup(func() { _ = up.Terminate(context.Background()) })
|
||||
|
||||
grp, err := srv.API().Groups.Create(ctx, api.PostApiGroupsJSONRequestBody{Name: "e2e-skiptls"})
|
||||
require.NoError(t, err, "create group")
|
||||
t.Cleanup(func() { _ = srv.API().Groups.Delete(context.Background(), grp.Id) })
|
||||
|
||||
ephemeral := false
|
||||
sk, err := srv.API().SetupKeys.Create(ctx, api.PostApiSetupKeysJSONRequestBody{
|
||||
Name: "e2e-skiptls-client",
|
||||
Type: "reusable",
|
||||
ExpiresIn: 86400,
|
||||
UsageLimit: 0,
|
||||
AutoGroups: []string{grp.Id},
|
||||
Ephemeral: &ephemeral,
|
||||
})
|
||||
require.NoError(t, err, "mint setup key")
|
||||
require.NotEmpty(t, sk.Key, "setup key plaintext")
|
||||
|
||||
const (
|
||||
insecureModel = "insecure-model"
|
||||
secureModel = "secure-model"
|
||||
)
|
||||
|
||||
// Two providers on the SAME self-signed upstream, distinguished only by their
|
||||
// skip_tls_verification and a unique model string so the router picks each
|
||||
// unambiguously.
|
||||
newReq := func(name, model string, skip bool) api.AgentNetworkProviderRequest {
|
||||
key := "sk-dummy-e2e"
|
||||
return api.AgentNetworkProviderRequest{
|
||||
Name: name,
|
||||
ProviderId: "openai_api",
|
||||
UpstreamUrl: up.URL,
|
||||
ApiKey: &key,
|
||||
Enabled: ptr(true),
|
||||
SkipTlsVerification: ptr(skip),
|
||||
Models: &[]api.AgentNetworkProviderModel{
|
||||
{Id: model, InputPer1k: 0.001, OutputPer1k: 0.002},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// First create bootstraps the account cluster.
|
||||
insecureReq := newReq("skip-tls", insecureModel, true)
|
||||
insecureReq.BootstrapCluster = ptr(harness.AgentNetworkCluster)
|
||||
insecureProv, err := srv.CreateProvider(ctx, insecureReq)
|
||||
require.NoError(t, err, "create skip-tls provider")
|
||||
t.Cleanup(func() { _ = srv.DeleteProvider(context.Background(), insecureProv.Id) })
|
||||
require.True(t, insecureProv.SkipTlsVerification, "response must echo skip_tls_verification=true")
|
||||
|
||||
secureProv, err := srv.CreateProvider(ctx, newReq("verify-tls", secureModel, false))
|
||||
require.NoError(t, err, "create verify-tls provider")
|
||||
t.Cleanup(func() { _ = srv.DeleteProvider(context.Background(), secureProv.Id) })
|
||||
require.False(t, secureProv.SkipTlsVerification, "response must echo skip_tls_verification=false")
|
||||
|
||||
enabled := true
|
||||
pol, err := srv.CreatePolicy(ctx, api.AgentNetworkPolicyRequest{
|
||||
Name: "e2e-skiptls-allow",
|
||||
Enabled: &enabled,
|
||||
SourceGroups: []string{grp.Id},
|
||||
DestinationProviderIds: []string{insecureProv.Id, secureProv.Id},
|
||||
})
|
||||
require.NoError(t, err, "create policy")
|
||||
t.Cleanup(func() { _ = srv.DeletePolicy(context.Background(), pol.Id) })
|
||||
|
||||
settings, err := srv.GetSettings(ctx)
|
||||
require.NoError(t, err, "read settings")
|
||||
require.NotEmpty(t, settings.Endpoint, "endpoint must be assigned")
|
||||
|
||||
proxyToken, err := srv.CreateProxyTokenCLI(ctx, "e2e-skiptls-proxy")
|
||||
require.NoError(t, err, "mint proxy token")
|
||||
px, err := harness.StartProxy(ctx, srv, proxyToken)
|
||||
require.NoError(t, err, "start proxy")
|
||||
t.Cleanup(func() { _ = px.Terminate(context.Background()) })
|
||||
|
||||
cl, err := harness.StartClient(ctx, srv, sk.Key)
|
||||
require.NoError(t, err, "start client")
|
||||
t.Cleanup(func() { _ = cl.Terminate(context.Background()) })
|
||||
|
||||
require.NoError(t, cl.WaitConnected(ctx, 90*time.Second), "client must connect to management")
|
||||
if err := cl.WaitProxyPeer(ctx, 180*time.Second); err != nil {
|
||||
t.Fatalf("client did not see the proxy peer: %v\n=== proxy logs ===\n%s", err, px.Logs(context.Background()))
|
||||
}
|
||||
proxyIP, err := cl.ResolveProxyIP(ctx, settings.Endpoint)
|
||||
require.NoError(t, err, "resolve endpoint to proxy IP")
|
||||
|
||||
// Positive: skip=true reaches the self-signed upstream. Retry to absorb
|
||||
// tunnel/DNS jitter on the first call; success also proves the path works.
|
||||
var code int
|
||||
var body string
|
||||
deadline := time.Now().Add(90 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
c, b, cerr := cl.Chat(ctx, settings.Endpoint, proxyIP, harness.WireChat, insecureModel, "Reply with exactly: pong", "e2e-skiptls-insecure")
|
||||
if cerr == nil {
|
||||
code, body = c, b
|
||||
if code == 200 {
|
||||
break
|
||||
}
|
||||
}
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
require.Equal(t, 200, code,
|
||||
"skip_tls_verification=true must reach the self-signed upstream; body: %s\n=== upstream logs ===\n%s\n=== proxy logs ===\n%s",
|
||||
body, up.Logs(context.Background()), px.Logs(context.Background()))
|
||||
|
||||
// Negative: skip=false must fail the TLS handshake to the SAME upstream. The
|
||||
// path is already proven working, so a non-200 here is the cert rejection.
|
||||
secureCode, secureBody, cerr := cl.Chat(ctx, settings.Endpoint, proxyIP, harness.WireChat, secureModel, "Reply with exactly: pong", "e2e-skiptls-secure")
|
||||
require.NoError(t, cerr, "the chat call itself must complete (proxy returns an error status, not a transport error)")
|
||||
require.NotEqual(t, 200, secureCode,
|
||||
"skip_tls_verification=false must NOT reach the self-signed upstream; got %d, body: %s", secureCode, secureBody)
|
||||
require.GreaterOrEqual(t, secureCode, 500,
|
||||
"a TLS verification failure should surface as a 5xx from the proxy; got %d, body: %s", secureCode, secureBody)
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -108,9 +109,48 @@ func (cl *Client) WaitConnected(ctx context.Context, timeout time.Duration) erro
|
||||
return cl.pollStatus(ctx, timeout, "Management: Connected")
|
||||
}
|
||||
|
||||
// WaitProxyPeer polls until the client sees the proxy peer connected (1/1).
|
||||
// WaitProxyPeer polls until the client sees at least one connected peer — the
|
||||
// proxy serving the agent-network endpoint. It requires ">=1 connected" rather
|
||||
// than an exact "1/1" because proxy peers from earlier tests linger in the
|
||||
// account as disconnected (each proxy container registers a fresh WireGuard key
|
||||
// and the peer is not removed on teardown), so the count is e.g. "1/2". Only the
|
||||
// live proxy can be connected, and the caller's subsequent chat is the real
|
||||
// end-to-end assertion.
|
||||
func (cl *Client) WaitProxyPeer(ctx context.Context, timeout time.Duration) error {
|
||||
return cl.pollStatus(ctx, timeout, "1/1 Connected")
|
||||
deadline := time.Now().Add(timeout)
|
||||
var last string
|
||||
for time.Now().Before(deadline) {
|
||||
out, _ := cl.Status(ctx)
|
||||
last = out
|
||||
if connectedPeers(out) >= 1 {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
return fmt.Errorf("timed out waiting for a connected proxy peer; last status:\n%s", last)
|
||||
}
|
||||
|
||||
// connectedPeers parses the "Peers count: X/Y Connected" line from `netbird
|
||||
// status` and returns X (the connected count), or 0 when absent/unparseable.
|
||||
func connectedPeers(status string) int {
|
||||
for _, line := range strings.Split(status, "\n") {
|
||||
line = strings.TrimSpace(line)
|
||||
rest, ok := strings.CutPrefix(line, "Peers count:")
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
rest = strings.TrimSpace(rest)
|
||||
slash := strings.IndexByte(rest, '/')
|
||||
if slash <= 0 {
|
||||
return 0
|
||||
}
|
||||
n, err := strconv.Atoi(strings.TrimSpace(rest[:slash]))
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return n
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (cl *Client) pollStatus(ctx context.Context, timeout time.Duration, want string) error {
|
||||
|
||||
107
e2e/harness/upstream.go
Normal file
107
e2e/harness/upstream.go
Normal file
@@ -0,0 +1,107 @@
|
||||
//go:build e2e
|
||||
|
||||
package harness
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/testcontainers/testcontainers-go"
|
||||
"github.com/testcontainers/testcontainers-go/wait"
|
||||
)
|
||||
|
||||
const (
|
||||
fakeUpstreamImage = "nginx:alpine"
|
||||
fakeUpstreamAlias = "fakeupstream"
|
||||
fakeUpstreamPort = "443/tcp"
|
||||
)
|
||||
|
||||
// fakeUpstreamNginxConf serves a canned OpenAI-shaped chat completion for any
|
||||
// path over a self-signed certificate, so the proxy reaches it only when the
|
||||
// provider opts into skipping TLS verification.
|
||||
const fakeUpstreamNginxConf = `pid /tmp/nginx.pid;
|
||||
events {}
|
||||
http {
|
||||
server {
|
||||
listen 443 ssl;
|
||||
ssl_certificate /certs/tls.crt;
|
||||
ssl_certificate_key /certs/tls.key;
|
||||
location / {
|
||||
default_type application/json;
|
||||
return 200 '{"id":"chatcmpl-e2e","object":"chat.completion","choices":[{"index":0,"message":{"role":"assistant","content":"pong"},"finish_reason":"stop"}],"usage":{"prompt_tokens":1,"completion_tokens":1,"total_tokens":2}}';
|
||||
}
|
||||
}
|
||||
}
|
||||
`
|
||||
|
||||
// FakeUpstream is a self-signed HTTPS server on the combined server's network,
|
||||
// used to exercise provider skip_tls_verification: a proxy that verifies the
|
||||
// certificate rejects it, one that skips verification reaches it.
|
||||
type FakeUpstream struct {
|
||||
container testcontainers.Container
|
||||
workDir string
|
||||
// URL is the upstream URL providers point at (https://<alias>).
|
||||
URL string
|
||||
}
|
||||
|
||||
// StartFakeUpstream runs the self-signed upstream on the shared network.
|
||||
func StartFakeUpstream(ctx context.Context, c *Combined) (*FakeUpstream, error) {
|
||||
workDir, err := os.MkdirTemp("/tmp", "nb-e2e-upstream-*")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create upstream work dir: %w", err)
|
||||
}
|
||||
// Widen so the (non-root worker) nginx container can traverse the bind mount.
|
||||
if err := os.Chmod(workDir, 0o755); err != nil { //nolint:gosec // throwaway e2e cert dir
|
||||
return nil, fmt.Errorf("chmod upstream dir: %w", err)
|
||||
}
|
||||
if err := writeSelfSignedCert(workDir, []string{fakeUpstreamAlias}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(workDir, "nginx.conf"), []byte(fakeUpstreamNginxConf), 0o644); err != nil { //nolint:gosec // non-secret e2e config
|
||||
return nil, fmt.Errorf("write nginx conf: %w", err)
|
||||
}
|
||||
|
||||
req := testcontainers.ContainerRequest{
|
||||
Image: fakeUpstreamImage,
|
||||
ExposedPorts: []string{fakeUpstreamPort},
|
||||
Networks: []string{c.network.Name},
|
||||
NetworkAliases: map[string][]string{c.network.Name: {fakeUpstreamAlias}},
|
||||
Cmd: []string{"nginx", "-c", "/certs/nginx.conf", "-g", "daemon off;"},
|
||||
HostConfigModifier: func(hc *container.HostConfig) {
|
||||
hc.Binds = append(hc.Binds, workDir+":/certs:ro")
|
||||
},
|
||||
WaitingFor: wait.ForListeningPort(fakeUpstreamPort).WithStartupTimeout(60 * time.Second),
|
||||
}
|
||||
|
||||
ctr, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
|
||||
ContainerRequest: req,
|
||||
Started: true,
|
||||
})
|
||||
if err != nil {
|
||||
_ = os.RemoveAll(workDir)
|
||||
return nil, fmt.Errorf("start fake upstream container: %w", err)
|
||||
}
|
||||
|
||||
return &FakeUpstream{container: ctr, workDir: workDir, URL: "https://" + fakeUpstreamAlias}, nil
|
||||
}
|
||||
|
||||
// Logs returns the upstream container logs, for diagnostics on failure.
|
||||
func (u *FakeUpstream) Logs(ctx context.Context) string {
|
||||
return containerLogs(ctx, u.container)
|
||||
}
|
||||
|
||||
// Terminate stops the upstream container and cleans its work dir.
|
||||
func (u *FakeUpstream) Terminate(ctx context.Context) error {
|
||||
var err error
|
||||
if u.container != nil {
|
||||
err = u.container.Terminate(ctx)
|
||||
}
|
||||
if u.workDir != "" {
|
||||
_ = os.RemoveAll(u.workDir)
|
||||
}
|
||||
return err
|
||||
}
|
||||
@@ -366,6 +366,10 @@ type routerProviderRoute struct {
|
||||
// + refreshes the OAuth token at request time instead of injecting a static
|
||||
// AuthHeaderValue.
|
||||
GCPServiceAccountKeyB64 string `json:"gcp_sa_key_b64,omitempty"`
|
||||
// SkipTLSVerify disables upstream TLS certificate verification when the
|
||||
// proxy dials this provider's upstream. For self-hosted / internal gateways
|
||||
// behind a private or self-signed certificate.
|
||||
SkipTLSVerify bool `json:"skip_tls_verify,omitempty"`
|
||||
}
|
||||
|
||||
// indexProviderGroups walks the enabled policies and returns, per
|
||||
@@ -450,6 +454,7 @@ func buildRouterConfigJSON(providers []*types.Provider, groupIndex map[string][]
|
||||
Vertex: catalog.IsVertexPathStyle(p.ProviderID),
|
||||
Bedrock: catalog.IsBedrockPathStyle(p.ProviderID),
|
||||
GCPServiceAccountKeyB64: gcpSAKeyB64,
|
||||
SkipTLSVerify: p.SkipTLSVerification,
|
||||
})
|
||||
}
|
||||
out, err := json.Marshal(cfg)
|
||||
|
||||
@@ -1057,6 +1057,41 @@ func TestSynthesizeServices_UpstreamURLPath_FlowsToRouter(t *testing.T) {
|
||||
"upstream path must be carried so the router can disambiguate same-model providers; trailing slash trimmed for stable string-prefix matching")
|
||||
}
|
||||
|
||||
func TestSynthesizeServices_SkipTLSVerification_FlowsToRouter(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
mockStore := store.NewMockStore(ctrl)
|
||||
|
||||
// A provider fronting a self-hosted / internal gateway opts into skipping
|
||||
// upstream TLS verification; the synthesiser must carry it into the router
|
||||
// route so the proxy dials that upstream insecurely.
|
||||
provider := newSynthTestProvider()
|
||||
provider.SkipTLSVerification = true
|
||||
policy := newSynthTestPolicy(provider.ID, "grp-eng", "")
|
||||
|
||||
expectSynthBaseInputs(mockStore, ctx, newSynthTestSettings(),
|
||||
[]*types.Provider{provider},
|
||||
[]*types.Policy{policy},
|
||||
[]*types.Guardrail{})
|
||||
|
||||
services, err := SynthesizeServices(ctx, mockStore, testAccountID)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, services, 1)
|
||||
|
||||
mws := services[0].Targets[0].Options.Middlewares
|
||||
var routerCfg routerConfig
|
||||
for _, m := range mws {
|
||||
if m.ID == middlewareIDLLMRouter {
|
||||
require.NoError(t, json.Unmarshal(m.ConfigJSON, &routerCfg))
|
||||
break
|
||||
}
|
||||
}
|
||||
require.Len(t, routerCfg.Providers, 1)
|
||||
assert.True(t, routerCfg.Providers[0].SkipTLSVerify,
|
||||
"provider skip_tls_verification must flow into the router route")
|
||||
}
|
||||
|
||||
func TestSynthesizeServices_UnknownProviderID_FailsClosed(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
ctrl := gomock.NewController(t)
|
||||
|
||||
@@ -46,6 +46,11 @@ type Provider struct {
|
||||
// Empty means all catalog models are allowed at catalog prices.
|
||||
Models []ProviderModel `gorm:"serializer:json"`
|
||||
Enabled bool
|
||||
// SkipTLSVerification disables upstream TLS certificate verification for
|
||||
// this provider's URL. For self-hosted / internal gateways fronted by a
|
||||
// private or self-signed certificate. The synthesiser propagates it into
|
||||
// the router route so the proxy dials that provider's upstream insecurely.
|
||||
SkipTLSVerification bool `gorm:"column:skip_tls_verification"`
|
||||
// SessionPrivateKey + SessionPublicKey are the ed25519 keypair the
|
||||
// synthesised reverse-proxy service uses to sign / verify session
|
||||
// JWTs after a successful OIDC handshake. Generated once on
|
||||
@@ -129,6 +134,9 @@ func (p *Provider) FromAPIRequest(req *api.AgentNetworkProviderRequest) {
|
||||
if req.Enabled != nil {
|
||||
p.Enabled = *req.Enabled
|
||||
}
|
||||
if req.SkipTlsVerification != nil {
|
||||
p.SkipTLSVerification = *req.SkipTlsVerification
|
||||
}
|
||||
// Identity-header overrides for catalogs flagged Customizable.
|
||||
// nil pointer = "field omitted on the wire" → leave the stored
|
||||
// value untouched (per the openapi description). Empty string is
|
||||
@@ -155,14 +163,15 @@ func (p *Provider) ToAPIResponse() *api.AgentNetworkProvider {
|
||||
created := p.CreatedAt
|
||||
updated := p.UpdatedAt
|
||||
resp := &api.AgentNetworkProvider{
|
||||
Id: p.ID,
|
||||
ProviderId: p.ProviderID,
|
||||
Name: p.Name,
|
||||
UpstreamUrl: p.UpstreamURL,
|
||||
Models: models,
|
||||
Enabled: p.Enabled,
|
||||
CreatedAt: &created,
|
||||
UpdatedAt: &updated,
|
||||
Id: p.ID,
|
||||
ProviderId: p.ProviderID,
|
||||
Name: p.Name,
|
||||
UpstreamUrl: p.UpstreamURL,
|
||||
Models: models,
|
||||
Enabled: p.Enabled,
|
||||
SkipTlsVerification: p.SkipTLSVerification,
|
||||
CreatedAt: &created,
|
||||
UpdatedAt: &updated,
|
||||
}
|
||||
if len(p.ExtraValues) > 0 {
|
||||
out := make(map[string]string, len(p.ExtraValues))
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/netbirdio/netbird/shared/management/http/api"
|
||||
)
|
||||
|
||||
// TestProvider_SkipTLSVerification_RoundTrip covers the request→provider→
|
||||
// response mapping of skip_tls_verification, including the update semantics
|
||||
// (nil pointer preserves, explicit false clears).
|
||||
func TestProvider_SkipTLSVerification_RoundTrip(t *testing.T) {
|
||||
enable := true
|
||||
disable := false
|
||||
|
||||
base := func() *api.AgentNetworkProviderRequest {
|
||||
return &api.AgentNetworkProviderRequest{
|
||||
ProviderId: "openai_api",
|
||||
Name: "internal",
|
||||
UpstreamUrl: "https://gw.internal",
|
||||
}
|
||||
}
|
||||
|
||||
p := NewProvider("acc-1")
|
||||
|
||||
req := base()
|
||||
req.SkipTlsVerification = &enable
|
||||
p.FromAPIRequest(req)
|
||||
assert.True(t, p.SkipTLSVerification, "create with skip_tls_verification=true must set the field")
|
||||
assert.True(t, p.ToAPIResponse().SkipTlsVerification, "response must surface skip_tls_verification")
|
||||
|
||||
// Omitting the field on update leaves the stored value untouched.
|
||||
p.FromAPIRequest(base())
|
||||
assert.True(t, p.SkipTLSVerification, "omitting skip_tls_verification on update must preserve it")
|
||||
|
||||
// Explicit false clears it.
|
||||
req = base()
|
||||
req.SkipTlsVerification = &disable
|
||||
p.FromAPIRequest(req)
|
||||
assert.False(t, p.SkipTLSVerification, "explicit false must clear skip_tls_verification")
|
||||
assert.False(t, p.ToAPIResponse().SkipTlsVerification, "response must reflect the cleared value")
|
||||
}
|
||||
@@ -2057,6 +2057,7 @@ func newAccountWithId(ctx context.Context, accountID, userID, domain, email, nam
|
||||
Extra: &types.ExtraSettings{
|
||||
UserApprovalRequired: true,
|
||||
},
|
||||
LazyConnectionEnabled: true,
|
||||
},
|
||||
Onboarding: types.AccountOnboarding{
|
||||
OnboardingFlowPending: true,
|
||||
|
||||
@@ -59,6 +59,10 @@ type ProviderRoute struct {
|
||||
// (instead of the static AuthHeaderValue) — so the gateway holds a durable
|
||||
// Vertex credential rather than a 1-hour token.
|
||||
GCPServiceAccountKeyB64 string `json:"gcp_sa_key_b64,omitempty"`
|
||||
// SkipTLSVerify disables upstream TLS certificate verification when dialing
|
||||
// this route's upstream. For self-hosted / internal gateways behind a
|
||||
// private or self-signed certificate.
|
||||
SkipTLSVerify bool `json:"skip_tls_verify,omitempty"`
|
||||
}
|
||||
|
||||
// Config is the on-wire configuration accepted by the factory. An
|
||||
|
||||
@@ -615,8 +615,9 @@ func (m *Middleware) allowWithRoute(route ProviderRoute, userGroups []string) *m
|
||||
// path is silently dropped and the gateway returns a 4xx for
|
||||
// the malformed URL. Empty value leaves the original
|
||||
// target's path untouched.
|
||||
Path: route.UpstreamPath,
|
||||
StripHeaders: append([]string(nil), strippedAuthHeaders...),
|
||||
Path: route.UpstreamPath,
|
||||
StripHeaders: append([]string(nil), strippedAuthHeaders...),
|
||||
SkipTLSVerify: route.SkipTLSVerify,
|
||||
}
|
||||
authValue := route.AuthHeaderValue
|
||||
if route.GCPServiceAccountKeyB64 != "" {
|
||||
|
||||
@@ -107,6 +107,41 @@ func TestRouter_HappyPath(t *testing.T) {
|
||||
assert.Equal(t, "allow", dec, "decision metadata must be allow on a match")
|
||||
}
|
||||
|
||||
func TestRouter_SkipTLSVerifyPropagates(t *testing.T) {
|
||||
base := ProviderRoute{
|
||||
ID: "internal-gw",
|
||||
Models: []string{"gpt-4o"},
|
||||
AllowedGroupIDs: []string{defaultTestGroup},
|
||||
UpstreamScheme: "https",
|
||||
UpstreamHost: "gateway.internal",
|
||||
AuthHeaderName: "Authorization",
|
||||
AuthHeaderValue: "Bearer sk-test-123",
|
||||
}
|
||||
|
||||
t.Run("enabled", func(t *testing.T) {
|
||||
route := base
|
||||
route.SkipTLSVerify = true
|
||||
mw := New(Config{Providers: []ProviderRoute{route}})
|
||||
|
||||
out, err := mw.Invoke(context.Background(), newInputWithModel("gpt-4o"))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out.Mutations, "matched route must emit mutations")
|
||||
require.NotNil(t, out.Mutations.RewriteUpstream, "matched route must emit upstream rewrite")
|
||||
assert.True(t, out.Mutations.RewriteUpstream.SkipTLSVerify,
|
||||
"skip_tls_verify on the route must ride on the upstream rewrite")
|
||||
})
|
||||
|
||||
t.Run("default off", func(t *testing.T) {
|
||||
mw := New(Config{Providers: []ProviderRoute{base}})
|
||||
|
||||
out, err := mw.Invoke(context.Background(), newInputWithModel("gpt-4o"))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out.Mutations.RewriteUpstream, "matched route must emit upstream rewrite")
|
||||
assert.False(t, out.Mutations.RewriteUpstream.SkipTLSVerify,
|
||||
"skip_tls_verify must default to false when the route does not set it")
|
||||
})
|
||||
}
|
||||
|
||||
func TestRouter_MissingModel(t *testing.T) {
|
||||
mw := New(Config{Providers: []ProviderRoute{{
|
||||
ID: "openai-prod",
|
||||
|
||||
@@ -243,6 +243,10 @@ type UpstreamRewrite struct {
|
||||
StripPathPrefix string
|
||||
AuthHeader *AuthHeader
|
||||
StripHeaders []string
|
||||
// SkipTLSVerify, when true, makes the proxy dial the rewritten upstream
|
||||
// without verifying its TLS certificate. Set by llm_router from the
|
||||
// provider's skip_tls_verification for self-hosted / internal gateways.
|
||||
SkipTLSVerify bool
|
||||
}
|
||||
|
||||
// AuthHeader is a single name/value pair the proxy injects on the
|
||||
|
||||
@@ -346,6 +346,11 @@ func (p *ReverseProxy) forwardUpstream(respWriter http.ResponseWriter, r *http.R
|
||||
r.Host = effectiveURL.Host
|
||||
applyUpstreamHeaders(r, upstreamRewrite)
|
||||
stripUpstreamPathPrefix(r, upstreamRewrite.StripPathPrefix)
|
||||
// A router-selected route (e.g. agent-network provider) can opt into
|
||||
// skipping upstream TLS verification per its provider config.
|
||||
if upstreamRewrite.SkipTLSVerify {
|
||||
ctx = roundtrip.WithSkipTLSVerify(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
rp := &httputil.ReverseProxy{
|
||||
|
||||
@@ -536,7 +536,7 @@ func (c *GrpcClient) IsHealthy() bool {
|
||||
ctx, cancel := context.WithTimeout(c.ctx, healthCheckTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := c.realClient.GetServerKey(ctx, &proto.Empty{})
|
||||
_, err := c.realClient.IsHealthy(ctx, &proto.Empty{})
|
||||
if err != nil {
|
||||
c.notifyDisconnected(err)
|
||||
log.Warnf("health check returned: %s", err)
|
||||
|
||||
@@ -5119,6 +5119,10 @@ components:
|
||||
type: boolean
|
||||
description: Whether the provider is enabled.
|
||||
example: true
|
||||
skip_tls_verification:
|
||||
type: boolean
|
||||
description: Whether upstream TLS certificate verification is skipped when the proxy dials this provider's URL. Intended for self-hosted / internal gateways behind a private or self-signed certificate.
|
||||
example: false
|
||||
created_at:
|
||||
type: string
|
||||
format: date-time
|
||||
@@ -5138,6 +5142,7 @@ components:
|
||||
- upstream_url
|
||||
- models
|
||||
- enabled
|
||||
- skip_tls_verification
|
||||
- created_at
|
||||
- updated_at
|
||||
AgentNetworkProviderRequest:
|
||||
@@ -5190,6 +5195,10 @@ components:
|
||||
type: boolean
|
||||
description: Whether the provider is enabled. Defaults to true on create.
|
||||
example: true
|
||||
skip_tls_verification:
|
||||
type: boolean
|
||||
description: Skip upstream TLS certificate verification when the proxy dials this provider's URL. For self-hosted / internal gateways behind a private or self-signed certificate. Defaults to false. When omitted on update, the stored value is left unchanged.
|
||||
example: false
|
||||
required:
|
||||
- provider_id
|
||||
- name
|
||||
|
||||
@@ -2224,6 +2224,9 @@ type AgentNetworkProvider struct {
|
||||
// ProviderId Catalog identifier for the upstream AI provider (e.g. openai_api, anthropic_api, azure_openai_api, bedrock_api, vertex_ai_api, mistral_api, custom).
|
||||
ProviderId string `json:"provider_id"`
|
||||
|
||||
// SkipTlsVerification Whether upstream TLS certificate verification is skipped when the proxy dials this provider's URL. Intended for self-hosted / internal gateways behind a private or self-signed certificate.
|
||||
SkipTlsVerification bool `json:"skip_tls_verification"`
|
||||
|
||||
// UpdatedAt Timestamp when the provider was last updated.
|
||||
UpdatedAt *time.Time `json:"updated_at,omitempty"`
|
||||
|
||||
@@ -2272,6 +2275,9 @@ type AgentNetworkProviderRequest struct {
|
||||
// ProviderId Catalog identifier for the upstream AI provider (e.g. openai_api, anthropic_api, azure_openai_api, bedrock_api, vertex_ai_api, mistral_api, custom).
|
||||
ProviderId string `json:"provider_id"`
|
||||
|
||||
// SkipTlsVerification Skip upstream TLS certificate verification when the proxy dials this provider's URL. For self-hosted / internal gateways behind a private or self-signed certificate. Defaults to false. When omitted on update, the stored value is left unchanged.
|
||||
SkipTlsVerification *bool `json:"skip_tls_verification,omitempty"`
|
||||
|
||||
// UpstreamUrl Full upstream URL (with scheme) that NetBird forwards traffic to.
|
||||
UpstreamUrl string `json:"upstream_url"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user