Compare commits

..

5 Commits

Author SHA1 Message Date
Zoltan Papp
f425870c8e [client] Avoid duplicated agent close (#4383) 2025-08-20 18:50:51 +02:00
Pascal Fischer
f9d64a06c2 [management] Remove all store locks from grpc side (#4374) 2025-08-20 12:41:14 +02:00
hakansa
86555c44f7 refactor doc workflow (#4373)
refactor doc workflow (#4373)
2025-08-20 10:59:32 +03:00
Bastien Jeannelle
48792c64cd [misc] Fix confusing comment (#4376) 2025-08-20 00:12:00 +02:00
hakansa
533d93eb17 [management,client] Feat/exit node auto apply (#4272)
[management,client] Feat/exit node auto apply (#4272)
2025-08-19 18:19:24 +03:00
52 changed files with 1427 additions and 3211 deletions

View File

@@ -16,19 +16,29 @@ jobs:
steps:
- name: Read PR body
id: body
shell: bash
run: |
BODY=$(jq -r '.pull_request.body // ""' "$GITHUB_EVENT_PATH")
echo "body<<EOF" >> $GITHUB_OUTPUT
echo "$BODY" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
set -euo pipefail
BODY_B64=$(jq -r '.pull_request.body // "" | @base64' "$GITHUB_EVENT_PATH")
{
echo "body_b64=$BODY_B64"
} >> "$GITHUB_OUTPUT"
- name: Validate checkbox selection
id: validate
shell: bash
env:
BODY_B64: ${{ steps.body.outputs.body_b64 }}
run: |
body='${{ steps.body.outputs.body }}'
set -euo pipefail
if ! body="$(printf '%s' "$BODY_B64" | base64 -d)"; then
echo "::error::Failed to decode PR body from base64. Data may be corrupted or missing."
exit 1
fi
added_checked=$(printf '%s' "$body" | grep -Ei '^[[:space:]]*-\s*\[x\]\s*I added/updated documentation' | wc -l | tr -d '[:space:]' || true)
noneed_checked=$(printf '%s' "$body" | grep -Ei '^[[:space:]]*-\s*\[x\]\s*Documentation is \*\*not needed\*\*' | wc -l | tr -d '[:space:]' || true)
added_checked=$(printf "%s" "$body" | grep -E '^- \[x\] I added/updated documentation' -i | wc -l | tr -d ' ')
noneed_checked=$(printf "%s" "$body" | grep -E '^- \[x\] Documentation is \*\*not needed\*\*' -i | wc -l | tr -d ' ')
if [ "$added_checked" -eq 1 ] && [ "$noneed_checked" -eq 1 ]; then
echo "::error::Choose exactly one: either 'docs added' OR 'not needed'."
@@ -41,30 +51,35 @@ jobs:
fi
if [ "$added_checked" -eq 1 ]; then
echo "mode=added" >> $GITHUB_OUTPUT
echo "mode=added" >> "$GITHUB_OUTPUT"
else
echo "mode=noneed" >> $GITHUB_OUTPUT
echo "mode=noneed" >> "$GITHUB_OUTPUT"
fi
- name: Extract docs PR URL (when 'docs added')
if: steps.validate.outputs.mode == 'added'
id: extract
shell: bash
env:
BODY_B64: ${{ steps.body.outputs.body_b64 }}
run: |
body='${{ steps.body.outputs.body }}'
set -euo pipefail
body="$(printf '%s' "$BODY_B64" | base64 -d)"
# Strictly require HTTPS and that it's a PR in netbirdio/docs
# Examples accepted:
# https://github.com/netbirdio/docs/pull/1234
url=$(printf "%s" "$body" | grep -Eo 'https://github\.com/netbirdio/docs/pull/[0-9]+' | head -n1 || true)
# e.g., https://github.com/netbirdio/docs/pull/1234
url="$(printf '%s' "$body" | grep -Eo 'https://github\.com/netbirdio/docs/pull/[0-9]+' | head -n1 || true)"
if [ -z "$url" ]; then
if [ -z "${url:-}" ]; then
echo "::error::You checked 'docs added' but didn't include a valid HTTPS PR link to netbirdio/docs (e.g., https://github.com/netbirdio/docs/pull/1234)."
exit 1
fi
pr_number=$(echo "$url" | sed -E 's#.*/pull/([0-9]+)$#\1#')
echo "url=$url" >> $GITHUB_OUTPUT
echo "pr_number=$pr_number" >> $GITHUB_OUTPUT
pr_number="$(printf '%s' "$url" | sed -E 's#.*/pull/([0-9]+)$#\1#')"
{
echo "url=$url"
echo "pr_number=$pr_number"
} >> "$GITHUB_OUTPUT"
- name: Verify docs PR exists (and is open or merged)
if: steps.validate.outputs.mode == 'added'

View File

@@ -86,7 +86,6 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp
t.Cleanup(cleanUp)
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
jobManager := mgmt.NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, nil
@@ -107,13 +106,13 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp
Return(&types.Settings{}, nil).
AnyTimes()
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
if err != nil {
t.Fatal(err)
}
secretsManager := mgmt.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
mgmtServer, err := mgmt.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, nil, nil, &mgmt.MockIntegratedValidator{})
mgmtServer, err := mgmt.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, nil, nil, &mgmt.MockIntegratedValidator{})
if err != nil {
t.Fatal(err)
}

View File

@@ -22,8 +22,6 @@ import (
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/tun/netstack"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
nberrors "github.com/netbirdio/netbird/client/errors"
@@ -55,7 +53,6 @@ import (
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
nbssh "github.com/netbirdio/netbird/client/ssh"
nbstatus "github.com/netbirdio/netbird/client/status"
"github.com/netbirdio/netbird/client/system"
nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/route"
@@ -65,9 +62,7 @@ import (
relayClient "github.com/netbirdio/netbird/shared/relay/client"
signal "github.com/netbirdio/netbird/shared/signal/client"
sProto "github.com/netbirdio/netbird/shared/signal/proto"
"github.com/netbirdio/netbird/upload-server/types"
"github.com/netbirdio/netbird/util"
"google.golang.org/grpc/status"
)
// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
@@ -130,8 +125,6 @@ type EngineConfig struct {
BlockInbound bool
LazyConnectionEnabled bool
peerDaemonAddr string
}
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
@@ -468,7 +461,6 @@ func (e *Engine) Start() error {
e.receiveSignalEvents()
e.receiveManagementEvents()
e.receiveJobEvents()
// starting network monitor at the very last to avoid disruptions
e.startNetworkMonitor()
@@ -894,101 +886,6 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error {
return nil
}
func (e *Engine) getPeerClient(addr string) (*grpc.ClientConn, error) {
conn, err := grpc.NewClient(
strings.TrimPrefix(addr, "tcp://"),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf("failed to connect to daemon error: %v\n"+
"If the daemon is not running please run: "+
"\nnetbird service install \nnetbird service start\n", err)
}
return conn, nil
}
func (e *Engine) receiveJobEvents() {
go func() {
err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse {
// Simple test handler — replace with real logic
log.Infof("Received job request: %+v", msg)
// TODO: trigger local debug bundle or other job
return &mgmProto.JobResponse{
ID: msg.ID,
WorkloadResults: &mgmProto.JobResponse_Bundle{
Bundle: &mgmProto.BundleResult{
UploadKey: "upload-key",
},
},
}
})
if err != nil {
// happens if management is unavailable for a long time.
// We want to cancel the operation of the whole client
_ = CtxGetState(e.ctx).Wrap(ErrResetConnection)
e.clientCancel()
return
}
log.Debugf("stopped receiving jobs from Management Service")
}()
log.Debugf("connecting to Management Service jobs stream")
}
func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error) {
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
if err != nil {
return "", err
}
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Failed to close connection: %v", err)
}
}()
statusOutput, err := e.getStatusOutput(params.Anonymize)
if err != nil {
return "", err
}
request := &cProto.DebugBundleRequest{
Anonymize: params.Anonymize,
SystemInfo: true,
Status: statusOutput,
LogFileCount: uint32(params.LogFileCount),
UploadURL: types.DefaultBundleURL,
}
service := cProto.NewDaemonServiceClient(conn)
resp, err := service.DebugBundle(e.clientCtx, request)
if err != nil {
return "", fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message())
}
if resp.GetUploadFailureReason() != "" {
return "", fmt.Errorf("upload failed: " + resp.GetUploadFailureReason())
}
return resp.GetUploadedKey(), nil
}
func (e *Engine) getStatusOutput(anon bool) (string, error) {
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
if err != nil {
return "", err
}
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Failed to close connection: %v", err)
}
}()
statusResp, err := cProto.NewDaemonServiceClient(conn).Status(e.clientCtx, &cProto.StatusRequest{GetFullPeerStatus: true, ShouldRunProbes: true})
if err != nil {
return "", fmt.Errorf("status failed: %v", status.Convert(err).Message())
}
return nbstatus.ParseToFullDetailSummary(
nbstatus.ConvertToStatusOutputOverview(statusResp, anon, "", nil, nil, nil, "", ""),
), nil
}
// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
// E.g. when a new peer has been registered and we are allowed to connect to it.
func (e *Engine) receiveManagementEvents() {
@@ -1214,15 +1111,16 @@ func toRoutes(protoRoutes []*mgmProto.Route) []*route.Route {
}
convertedRoute := &route.Route{
ID: route.ID(protoRoute.ID),
Network: prefix.Masked(),
Domains: domain.FromPunycodeList(protoRoute.Domains),
NetID: route.NetID(protoRoute.NetID),
NetworkType: route.NetworkType(protoRoute.NetworkType),
Peer: protoRoute.Peer,
Metric: int(protoRoute.Metric),
Masquerade: protoRoute.Masquerade,
KeepRoute: protoRoute.KeepRoute,
ID: route.ID(protoRoute.ID),
Network: prefix.Masked(),
Domains: domain.FromPunycodeList(protoRoute.Domains),
NetID: route.NetID(protoRoute.NetID),
NetworkType: route.NetworkType(protoRoute.NetworkType),
Peer: protoRoute.Peer,
Metric: int(protoRoute.Metric),
Masquerade: protoRoute.Masquerade,
KeepRoute: protoRoute.KeepRoute,
SkipAutoApply: protoRoute.SkipAutoApply,
}
routes = append(routes, convertedRoute)
}

View File

@@ -1544,7 +1544,6 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
t.Cleanup(cleanUp)
peersUpdateManager := server.NewPeersUpdateManager(nil)
jobManager := server.NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, "", err
@@ -1569,13 +1568,13 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
permissionsManager := permissions.NewManager(store)
groupsManager := groups.NewManagerMock()
accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
if err != nil {
return nil, "", err
}
secretsManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
if err != nil {
return nil, "", err
}

View File

@@ -1,6 +1,7 @@
package ice
import (
"sync"
"time"
"github.com/pion/ice/v3"
@@ -23,7 +24,20 @@ const (
iceRelayAcceptanceMinWaitDefault = 2 * time.Second
)
func NewAgent(iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candidateTypes []ice.CandidateType, ufrag string, pwd string) (*ice.Agent, error) {
type ThreadSafeAgent struct {
*ice.Agent
once sync.Once
}
func (a *ThreadSafeAgent) Close() error {
var err error
a.once.Do(func() {
err = a.Agent.Close()
})
return err
}
func NewAgent(iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candidateTypes []ice.CandidateType, ufrag string, pwd string) (*ThreadSafeAgent, error) {
iceKeepAlive := iceKeepAlive()
iceDisconnectedTimeout := iceDisconnectedTimeout()
iceFailedTimeout := iceFailedTimeout()
@@ -61,7 +75,12 @@ func NewAgent(iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candida
agentConfig.NetworkTypes = []ice.NetworkType{ice.NetworkTypeUDP4}
}
return ice.NewAgent(agentConfig)
agent, err := ice.NewAgent(agentConfig)
if err != nil {
return nil, err
}
return &ThreadSafeAgent{Agent: agent}, nil
}
func GenerateICECredentials() (string, string, error) {

View File

@@ -42,7 +42,7 @@ type WorkerICE struct {
statusRecorder *Status
hasRelayOnLocally bool
agent *ice.Agent
agent *icemaker.ThreadSafeAgent
agentDialerCancel context.CancelFunc
agentConnecting bool // while it is true, drop all incoming offers
lastSuccess time.Time // with this avoid the too frequent ICE agent recreation
@@ -121,6 +121,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
if err := w.agent.Close(); err != nil {
w.log.Warnf("failed to close ICE agent: %s", err)
}
w.agent = nil
// todo consider to switch to Relay connection while establishing a new ICE connection
}
@@ -195,7 +196,7 @@ func (w *WorkerICE) Close() {
w.agent = nil
}
func (w *WorkerICE) reCreateAgent(dialerCancel context.CancelFunc, candidates []ice.CandidateType) (*ice.Agent, error) {
func (w *WorkerICE) reCreateAgent(dialerCancel context.CancelFunc, candidates []ice.CandidateType) (*icemaker.ThreadSafeAgent, error) {
agent, err := icemaker.NewAgent(w.iFaceDiscover, w.config.ICEConfig, candidates, w.localUfrag, w.localPwd)
if err != nil {
return nil, fmt.Errorf("create agent: %w", err)
@@ -230,7 +231,7 @@ func (w *WorkerICE) SessionID() ICESessionID {
// will block until connection succeeded
// but it won't release if ICE Agent went into Disconnected or Failed state,
// so we have to cancel it with the provided context once agent detected a broken connection
func (w *WorkerICE) connect(ctx context.Context, agent *ice.Agent, remoteOfferAnswer *OfferAnswer) {
func (w *WorkerICE) connect(ctx context.Context, agent *icemaker.ThreadSafeAgent, remoteOfferAnswer *OfferAnswer) {
w.log.Debugf("gather candidates")
if err := agent.GatherCandidates(); err != nil {
w.log.Warnf("failed to gather candidates: %s", err)
@@ -239,7 +240,7 @@ func (w *WorkerICE) connect(ctx context.Context, agent *ice.Agent, remoteOfferAn
}
w.log.Debugf("turn agent dial")
remoteConn, err := w.turnAgentDial(ctx, remoteOfferAnswer)
remoteConn, err := w.turnAgentDial(ctx, agent, remoteOfferAnswer)
if err != nil {
w.log.Debugf("failed to dial the remote peer: %s", err)
w.closeAgent(agent, w.agentDialerCancel)
@@ -290,13 +291,14 @@ func (w *WorkerICE) connect(ctx context.Context, agent *ice.Agent, remoteOfferAn
w.conn.onICEConnectionIsReady(selectedPriority(pair), ci)
}
func (w *WorkerICE) closeAgent(agent *ice.Agent, cancel context.CancelFunc) {
func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.CancelFunc) {
cancel()
if err := agent.Close(); err != nil {
w.log.Warnf("failed to close ICE agent: %s", err)
}
w.muxAgent.Lock()
// todo review does it make sense to generate new session ID all the time when w.agent==agent
sessionID, err := NewICESessionID()
if err != nil {
w.log.Errorf("failed to create new session ID: %s", err)
@@ -379,7 +381,7 @@ func (w *WorkerICE) onICESelectedCandidatePair(c1 ice.Candidate, c2 ice.Candidat
w.config.Key)
}
func (w *WorkerICE) onConnectionStateChange(agent *ice.Agent, dialerCancel context.CancelFunc) func(ice.ConnectionState) {
func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dialerCancel context.CancelFunc) func(ice.ConnectionState) {
return func(state ice.ConnectionState) {
w.log.Debugf("ICE ConnectionState has changed to %s", state.String())
switch state {
@@ -412,12 +414,12 @@ func (w *WorkerICE) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool
return false
}
func (w *WorkerICE) turnAgentDial(ctx context.Context, remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) {
func (w *WorkerICE) turnAgentDial(ctx context.Context, agent *icemaker.ThreadSafeAgent, remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) {
isControlling := w.config.LocalKey > w.config.Key
if isControlling {
return w.agent.Dial(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
return agent.Dial(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
} else {
return w.agent.Accept(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
return agent.Accept(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
}
}

View File

@@ -36,8 +36,8 @@ import (
"github.com/netbirdio/netbird/client/internal/routemanager/vars"
"github.com/netbirdio/netbird/client/internal/routeselector"
"github.com/netbirdio/netbird/client/internal/statemanager"
relayClient "github.com/netbirdio/netbird/shared/relay/client"
"github.com/netbirdio/netbird/route"
relayClient "github.com/netbirdio/netbird/shared/relay/client"
nbnet "github.com/netbirdio/netbird/util/net"
"github.com/netbirdio/netbird/version"
)
@@ -368,7 +368,11 @@ func (m *DefaultManager) UpdateRoutes(
var merr *multierror.Error
if !m.disableClientRoutes {
filteredClientRoutes := m.routeSelector.FilterSelected(clientRoutes)
// Update route selector based on management server's isSelected status
m.updateRouteSelectorFromManagement(clientRoutes)
filteredClientRoutes := m.routeSelector.FilterSelectedExitNodes(clientRoutes)
if err := m.updateSystemRoutes(filteredClientRoutes); err != nil {
merr = multierror.Append(merr, fmt.Errorf("update system routes: %w", err))
@@ -430,7 +434,7 @@ func (m *DefaultManager) TriggerSelection(networks route.HAMap) {
m.mux.Lock()
defer m.mux.Unlock()
networks = m.routeSelector.FilterSelected(networks)
networks = m.routeSelector.FilterSelectedExitNodes(networks)
m.notifier.OnNewRoutes(networks)
@@ -583,3 +587,106 @@ func resolveURLsToIPs(urls []string) []net.IP {
}
return ips
}
// updateRouteSelectorFromManagement updates the route selector based on the isSelected status from the management server
func (m *DefaultManager) updateRouteSelectorFromManagement(clientRoutes route.HAMap) {
exitNodeInfo := m.collectExitNodeInfo(clientRoutes)
if len(exitNodeInfo.allIDs) == 0 {
return
}
m.updateExitNodeSelections(exitNodeInfo)
m.logExitNodeUpdate(exitNodeInfo)
}
type exitNodeInfo struct {
allIDs []route.NetID
selectedByManagement []route.NetID
userSelected []route.NetID
userDeselected []route.NetID
}
func (m *DefaultManager) collectExitNodeInfo(clientRoutes route.HAMap) exitNodeInfo {
var info exitNodeInfo
for haID, routes := range clientRoutes {
if !m.isExitNodeRoute(routes) {
continue
}
netID := haID.NetID()
info.allIDs = append(info.allIDs, netID)
if m.routeSelector.HasUserSelectionForRoute(netID) {
m.categorizeUserSelection(netID, &info)
} else {
m.checkManagementSelection(routes, netID, &info)
}
}
return info
}
func (m *DefaultManager) isExitNodeRoute(routes []*route.Route) bool {
return len(routes) > 0 && routes[0].Network.String() == vars.ExitNodeCIDR
}
func (m *DefaultManager) categorizeUserSelection(netID route.NetID, info *exitNodeInfo) {
if m.routeSelector.IsSelected(netID) {
info.userSelected = append(info.userSelected, netID)
} else {
info.userDeselected = append(info.userDeselected, netID)
}
}
func (m *DefaultManager) checkManagementSelection(routes []*route.Route, netID route.NetID, info *exitNodeInfo) {
for _, route := range routes {
if !route.SkipAutoApply {
info.selectedByManagement = append(info.selectedByManagement, netID)
break
}
}
}
func (m *DefaultManager) updateExitNodeSelections(info exitNodeInfo) {
routesToDeselect := m.getRoutesToDeselect(info.allIDs)
m.deselectExitNodes(routesToDeselect)
m.selectExitNodesByManagement(info.selectedByManagement, info.allIDs)
}
func (m *DefaultManager) getRoutesToDeselect(allIDs []route.NetID) []route.NetID {
var routesToDeselect []route.NetID
for _, netID := range allIDs {
if !m.routeSelector.HasUserSelectionForRoute(netID) {
routesToDeselect = append(routesToDeselect, netID)
}
}
return routesToDeselect
}
func (m *DefaultManager) deselectExitNodes(routesToDeselect []route.NetID) {
if len(routesToDeselect) == 0 {
return
}
err := m.routeSelector.DeselectRoutes(routesToDeselect, routesToDeselect)
if err != nil {
log.Warnf("Failed to deselect exit nodes: %v", err)
}
}
func (m *DefaultManager) selectExitNodesByManagement(selectedByManagement []route.NetID, allIDs []route.NetID) {
if len(selectedByManagement) == 0 {
return
}
err := m.routeSelector.SelectRoutes(selectedByManagement, true, allIDs)
if err != nil {
log.Warnf("Failed to select exit nodes: %v", err)
}
}
func (m *DefaultManager) logExitNodeUpdate(info exitNodeInfo) {
log.Debugf("Updated route selector: %d exit nodes available, %d selected by management, %d user-selected, %d user-deselected",
len(info.allIDs), len(info.selectedByManagement), len(info.userSelected), len(info.userDeselected))
}

View File

@@ -190,14 +190,15 @@ func TestManagerUpdateRoutes(t *testing.T) {
name: "No Small Client Route Should Be Added",
inputRoutes: []*route.Route{
{
ID: "a",
NetID: "routeA",
Peer: remotePeerKey1,
Network: netip.MustParsePrefix("0.0.0.0/0"),
NetworkType: route.IPv4Network,
Metric: 9999,
Masquerade: false,
Enabled: true,
ID: "a",
NetID: "routeA",
Peer: remotePeerKey1,
Network: netip.MustParsePrefix("0.0.0.0/0"),
NetworkType: route.IPv4Network,
Metric: 9999,
Masquerade: false,
Enabled: true,
SkipAutoApply: false,
},
},
inputSerial: 1,

View File

@@ -13,4 +13,6 @@ var (
Defaultv4 = netip.PrefixFrom(netip.IPv4Unspecified(), 0)
Defaultv6 = netip.PrefixFrom(netip.IPv6Unspecified(), 0)
ExitNodeCIDR = "0.0.0.0/0"
)

View File

@@ -9,19 +9,27 @@ import (
"github.com/hashicorp/go-multierror"
"golang.org/x/exp/maps"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/errors"
"github.com/netbirdio/netbird/route"
)
const (
exitNodeCIDR = "0.0.0.0/0"
)
type RouteSelector struct {
mu sync.RWMutex
deselectedRoutes map[route.NetID]struct{}
selectedRoutes map[route.NetID]struct{}
deselectAll bool
}
func NewRouteSelector() *RouteSelector {
return &RouteSelector{
deselectedRoutes: map[route.NetID]struct{}{},
selectedRoutes: map[route.NetID]struct{}{},
deselectAll: false,
}
}
@@ -32,7 +40,14 @@ func (rs *RouteSelector) SelectRoutes(routes []route.NetID, appendRoute bool, al
defer rs.mu.Unlock()
if !appendRoute || rs.deselectAll {
if rs.deselectedRoutes == nil {
rs.deselectedRoutes = map[route.NetID]struct{}{}
}
if rs.selectedRoutes == nil {
rs.selectedRoutes = map[route.NetID]struct{}{}
}
maps.Clear(rs.deselectedRoutes)
maps.Clear(rs.selectedRoutes)
for _, r := range allRoutes {
rs.deselectedRoutes[r] = struct{}{}
}
@@ -45,6 +60,7 @@ func (rs *RouteSelector) SelectRoutes(routes []route.NetID, appendRoute bool, al
continue
}
delete(rs.deselectedRoutes, route)
rs.selectedRoutes[route] = struct{}{}
}
rs.deselectAll = false
@@ -58,7 +74,14 @@ func (rs *RouteSelector) SelectAllRoutes() {
defer rs.mu.Unlock()
rs.deselectAll = false
if rs.deselectedRoutes == nil {
rs.deselectedRoutes = map[route.NetID]struct{}{}
}
if rs.selectedRoutes == nil {
rs.selectedRoutes = map[route.NetID]struct{}{}
}
maps.Clear(rs.deselectedRoutes)
maps.Clear(rs.selectedRoutes)
}
// DeselectRoutes removes specific routes from the selection.
@@ -77,6 +100,7 @@ func (rs *RouteSelector) DeselectRoutes(routes []route.NetID, allRoutes []route.
continue
}
rs.deselectedRoutes[route] = struct{}{}
delete(rs.selectedRoutes, route)
}
return errors.FormatErrorOrNil(err)
@@ -88,7 +112,14 @@ func (rs *RouteSelector) DeselectAllRoutes() {
defer rs.mu.Unlock()
rs.deselectAll = true
if rs.deselectedRoutes == nil {
rs.deselectedRoutes = map[route.NetID]struct{}{}
}
if rs.selectedRoutes == nil {
rs.selectedRoutes = map[route.NetID]struct{}{}
}
maps.Clear(rs.deselectedRoutes)
maps.Clear(rs.selectedRoutes)
}
// IsSelected checks if a specific route is selected.
@@ -97,11 +128,14 @@ func (rs *RouteSelector) IsSelected(routeID route.NetID) bool {
defer rs.mu.RUnlock()
if rs.deselectAll {
log.Debugf("Route %s not selected (deselect all)", routeID)
return false
}
_, deselected := rs.deselectedRoutes[routeID]
return !deselected
isSelected := !deselected
log.Debugf("Route %s selection status: %v (deselected: %v)", routeID, isSelected, deselected)
return isSelected
}
// FilterSelected removes unselected routes from the provided map.
@@ -124,15 +158,98 @@ func (rs *RouteSelector) FilterSelected(routes route.HAMap) route.HAMap {
return filtered
}
// HasUserSelectionForRoute returns true if the user has explicitly selected or deselected this specific route
func (rs *RouteSelector) HasUserSelectionForRoute(routeID route.NetID) bool {
rs.mu.RLock()
defer rs.mu.RUnlock()
_, selected := rs.selectedRoutes[routeID]
_, deselected := rs.deselectedRoutes[routeID]
return selected || deselected
}
func (rs *RouteSelector) FilterSelectedExitNodes(routes route.HAMap) route.HAMap {
rs.mu.RLock()
defer rs.mu.RUnlock()
if rs.deselectAll {
return route.HAMap{}
}
filtered := make(route.HAMap, len(routes))
for id, rt := range routes {
netID := id.NetID()
if rs.isDeselected(netID) {
continue
}
if !isExitNode(rt) {
filtered[id] = rt
continue
}
rs.applyExitNodeFilter(id, netID, rt, filtered)
}
return filtered
}
func (rs *RouteSelector) isDeselected(netID route.NetID) bool {
_, deselected := rs.deselectedRoutes[netID]
return deselected || rs.deselectAll
}
func isExitNode(rt []*route.Route) bool {
return len(rt) > 0 && rt[0].Network.String() == exitNodeCIDR
}
func (rs *RouteSelector) applyExitNodeFilter(
id route.HAUniqueID,
netID route.NetID,
rt []*route.Route,
out route.HAMap,
) {
if rs.hasUserSelections() {
// user made explicit selects/deselects
if rs.IsSelected(netID) {
out[id] = rt
}
return
}
// no explicit selections: only include routes marked !SkipAutoApply (=AutoApply)
sel := collectSelected(rt)
if len(sel) > 0 {
out[id] = sel
}
}
func (rs *RouteSelector) hasUserSelections() bool {
return len(rs.selectedRoutes) > 0 || len(rs.deselectedRoutes) > 0
}
func collectSelected(rt []*route.Route) []*route.Route {
var sel []*route.Route
for _, r := range rt {
if !r.SkipAutoApply {
sel = append(sel, r)
}
}
return sel
}
// MarshalJSON implements the json.Marshaler interface
func (rs *RouteSelector) MarshalJSON() ([]byte, error) {
rs.mu.RLock()
defer rs.mu.RUnlock()
return json.Marshal(struct {
SelectedRoutes map[route.NetID]struct{} `json:"selected_routes"`
DeselectedRoutes map[route.NetID]struct{} `json:"deselected_routes"`
DeselectAll bool `json:"deselect_all"`
}{
SelectedRoutes: rs.selectedRoutes,
DeselectedRoutes: rs.deselectedRoutes,
DeselectAll: rs.deselectAll,
})
@@ -147,11 +264,13 @@ func (rs *RouteSelector) UnmarshalJSON(data []byte) error {
// Check for null or empty JSON
if len(data) == 0 || string(data) == "null" {
rs.deselectedRoutes = map[route.NetID]struct{}{}
rs.selectedRoutes = map[route.NetID]struct{}{}
rs.deselectAll = false
return nil
}
var temp struct {
SelectedRoutes map[route.NetID]struct{} `json:"selected_routes"`
DeselectedRoutes map[route.NetID]struct{} `json:"deselected_routes"`
DeselectAll bool `json:"deselect_all"`
}
@@ -160,12 +279,16 @@ func (rs *RouteSelector) UnmarshalJSON(data []byte) error {
return err
}
rs.selectedRoutes = temp.SelectedRoutes
rs.deselectedRoutes = temp.DeselectedRoutes
rs.deselectAll = temp.DeselectAll
if rs.deselectedRoutes == nil {
rs.deselectedRoutes = map[route.NetID]struct{}{}
}
if rs.selectedRoutes == nil {
rs.selectedRoutes = map[route.NetID]struct{}{}
}
return nil
}

View File

@@ -1,6 +1,7 @@
package routeselector_test
import (
"net/netip"
"slices"
"testing"
@@ -273,6 +274,62 @@ func TestRouteSelector_FilterSelected(t *testing.T) {
}, filtered)
}
func TestRouteSelector_FilterSelectedExitNodes(t *testing.T) {
rs := routeselector.NewRouteSelector()
// Create test routes
exitNode1 := &route.Route{
ID: "route1",
NetID: "net1",
Network: netip.MustParsePrefix("0.0.0.0/0"),
Peer: "peer1",
SkipAutoApply: false,
}
exitNode2 := &route.Route{
ID: "route2",
NetID: "net1",
Network: netip.MustParsePrefix("0.0.0.0/0"),
Peer: "peer2",
SkipAutoApply: true,
}
normalRoute := &route.Route{
ID: "route3",
NetID: "net2",
Network: netip.MustParsePrefix("192.168.1.0/24"),
Peer: "peer3",
SkipAutoApply: false,
}
routes := route.HAMap{
"net1|0.0.0.0/0": {exitNode1, exitNode2},
"net2|192.168.1.0/24": {normalRoute},
}
// Test filtering
filtered := rs.FilterSelectedExitNodes(routes)
// Should only include selected exit nodes and all normal routes
assert.Len(t, filtered, 2)
assert.Len(t, filtered["net1|0.0.0.0/0"], 1) // Only the selected exit node
assert.Equal(t, exitNode1.ID, filtered["net1|0.0.0.0/0"][0].ID)
assert.Len(t, filtered["net2|192.168.1.0/24"], 1) // Normal route should be included
assert.Equal(t, normalRoute.ID, filtered["net2|192.168.1.0/24"][0].ID)
// Test with deselected routes
err := rs.DeselectRoutes([]route.NetID{"net1"}, []route.NetID{"net1", "net2"})
assert.NoError(t, err)
filtered = rs.FilterSelectedExitNodes(routes)
assert.Len(t, filtered, 1) // Only normal route should remain
assert.Len(t, filtered["net2|192.168.1.0/24"], 1)
assert.Equal(t, normalRoute.ID, filtered["net2|192.168.1.0/24"][0].ID)
// Test with deselect all
rs = routeselector.NewRouteSelector()
rs.DeselectAllRoutes()
filtered = rs.FilterSelectedExitNodes(routes)
assert.Len(t, filtered, 0) // No routes should be selected
}
func TestRouteSelector_NewRoutesBehavior(t *testing.T) {
initialRoutes := []route.NetID{"route1", "route2", "route3"}
newRoutes := []route.NetID{"route1", "route2", "route3", "route4", "route5"}

View File

@@ -290,7 +290,6 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
t.Cleanup(cleanUp)
peersUpdateManager := server.NewPeersUpdateManager(nil)
jobManager := server.NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, "", err
@@ -306,13 +305,13 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
permissionsManagerMock := permissions.NewMockManager(ctrl)
groupsManager := groups.NewManagerMock()
accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
if err != nil {
return nil, "", err
}
secretsManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
if err != nil {
return nil, "", err
}

3
go.mod
View File

@@ -65,7 +65,6 @@ require (
github.com/nadoo/ipset v0.5.0
github.com/netbirdio/management-integrations/integrations v0.0.0-20250812185008-dfc66fa49a2e
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45
github.com/oapi-codegen/runtime v1.1.2
github.com/okta/okta-sdk-golang/v2 v2.18.0
github.com/oschwald/maxminddb-golang v1.12.0
github.com/patrickmn/go-cache v2.1.0+incompatible
@@ -126,7 +125,6 @@ require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/Microsoft/hcsshim v0.12.3 // indirect
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect
@@ -222,7 +220,6 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/rymdport/portal v0.3.0 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c // indirect

12
go.sum
View File

@@ -66,14 +66,11 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/Microsoft/hcsshim v0.12.3 h1:LS9NXqXhMoqNCplK1ApmVSfB4UnVLRDWRapB6EIlxE0=
github.com/Microsoft/hcsshim v0.12.3/go.mod h1:Iyl1WVpZzr+UkzjekHZbV8o5Z9ZkxNGx6CtY2Qg/JVQ=
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible h1:hqcTK6ZISdip65SR792lwYJTa/axESA0889D3UlZbLo=
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible/go.mod h1:6B1nuc1MUs6c62ODZDl7hVE5Pv7O2XGSkgg2olnq34I=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ=
github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
@@ -119,7 +116,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
@@ -420,7 +416,6 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X
github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e h1:LvL4XsI70QxOGHed6yhQtAU34Kx3Qq2wwBzGFKY8zKk=
github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e/go.mod h1:kLgvv7o6UM+0QSf0QjAse3wReFDsb9qbZJdfexWlrQw=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
@@ -521,8 +516,6 @@ github.com/nicksnyder/go-i18n/v2 v2.4.0/go.mod h1:nxYSZE9M0bf3Y70gPQjN9ha7XNHX7g
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oapi-codegen/runtime v1.1.2 h1:P2+CubHq8fO4Q6fV1tqDBZHCwpVpvPg7oKiYzQgXIyI=
github.com/oapi-codegen/runtime v1.1.2/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg=
github.com/okta/okta-sdk-golang/v2 v2.18.0 h1:cfDasMb7CShbZvOrF6n+DnLevWwiHgedWMGJ8M8xKDc=
github.com/okta/okta-sdk-golang/v2 v2.18.0/go.mod h1:dz30v3ctAiMb7jpsCngGfQUAEGm1/NsWT92uTbNDQIs=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@@ -595,8 +588,8 @@ github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0
github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/rs/cors v1.8.0 h1:P2KMzcFwrPoSjkF1WLRPsp3UMLyql8L4v9hQpVeK5so=
github.com/rs/cors v1.8.0/go.mod h1:EBwu+T5AvHOcXwvZIkQFjUN6s8Czyqw12GL/Y0tUyRM=
github.com/rs/xid v1.3.0 h1:6NjYksEUlhurdVehpc7S7dk6DAmcKv8V9gG0FsVN2U4=
@@ -634,7 +627,6 @@ github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c h1:km8GpoQut05eY3GiYWEedbTT0qnSxrCjsVbb7yKY1KE=
github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c/go.mod h1:cNQ3dwVJtS5Hmnjxy6AgTPd0Inb3pW05ftPSX7NZO7Q=
github.com/srwiley/rasterx v0.0.0-20220730225603-2ab79fcdd4ef h1:Ch6Q+AZUxDBCVqdkI8FSpFyZDtCVBc2VmejdNrm5rRQ=

View File

@@ -17,7 +17,7 @@ upstream signal {
server 127.0.0.1:10000;
}
upstream management {
# insert the grpc+http port of your signal container here
# insert the grpc+http port of your management container here
server 127.0.0.1:8012;
}
@@ -75,4 +75,4 @@ server {
ssl_certificate /etc/ssl/certs/ssl-cert-snakeoil.pem;
ssl_certificate_key /etc/ssl/certs/ssl-cert-snakeoil.pem;
}
}

View File

@@ -145,7 +145,7 @@ func (s *BaseServer) GRPCServer() *grpc.Server {
}
gRPCAPIHandler := grpc.NewServer(gRPCOpts...)
srv, err := server.NewServer(context.Background(), s.config, s.AccountManager(), s.SettingsManager(), s.PeersUpdateManager(), s.JobManager(), s.SecretsManager(), s.Metrics(), s.EphemeralManager(), s.AuthManager(), s.IntegratedValidator())
srv, err := server.NewServer(context.Background(), s.config, s.AccountManager(), s.SettingsManager(), s.PeersUpdateManager(), s.SecretsManager(), s.Metrics(), s.EphemeralManager(), s.AuthManager(), s.IntegratedValidator())
if err != nil {
log.Fatalf("failed to create management server: %v", err)
}

View File

@@ -18,12 +18,6 @@ func (s *BaseServer) PeersUpdateManager() *server.PeersUpdateManager {
})
}
func (s *BaseServer) JobManager() *server.JobManager {
return Create(s, func() *server.JobManager {
return server.NewJobManager(s.Metrics(), s.Store())
})
}
func (s *BaseServer) IntegratedValidator() integrated_validator.IntegratedValidator {
return Create(s, func() integrated_validator.IntegratedValidator {
integratedPeerValidator, err := integrations.NewIntegratedValidator(context.Background(), s.EventStore())

View File

@@ -60,7 +60,7 @@ func (s *BaseServer) PeersManager() peers.Manager {
func (s *BaseServer) AccountManager() account.Manager {
return Create(s, func() account.Manager {
accountManager, err := server.BuildManager(context.Background(), s.Store(), s.PeersUpdateManager(), s.JobManager(), s.IdpManager(), s.mgmtSingleAccModeDomain,
accountManager, err := server.BuildManager(context.Background(), s.Store(), s.PeersUpdateManager(), s.IdpManager(), s.mgmtSingleAccModeDomain,
s.dnsDomain, s.EventStore(), s.GeoLocationManager(), s.userDeleteFromIDPEnabled, s.IntegratedValidator(), s.Metrics(), s.ProxyController(), s.SettingsManager(), s.PermissionsManager(), s.config.DisableDefaultPolicy)
if err != nil {
log.Fatalf("failed to create account manager: %v", err)

View File

@@ -68,7 +68,6 @@ type DefaultAccountManager struct {
// cacheLoading keeps the accountIDs that are currently reloading. The accountID has to be removed once cache has been reloaded
cacheLoading map[string]chan struct{}
peersUpdateManager *PeersUpdateManager
jobManager *JobManager
idpManager idp.Manager
cacheManager *nbcache.AccountUserDataCache
externalCacheManager nbcache.UserDataCache
@@ -175,7 +174,6 @@ func BuildManager(
ctx context.Context,
store store.Store,
peersUpdateManager *PeersUpdateManager,
jobManager *JobManager,
idpManager idp.Manager,
singleAccountModeDomain string,
dnsDomain string,
@@ -198,7 +196,6 @@ func BuildManager(
Store: store,
geo: geo,
peersUpdateManager: peersUpdateManager,
jobManager: jobManager,
idpManager: idpManager,
ctx: context.Background(),
cacheMux: sync.Mutex{},
@@ -1642,11 +1639,6 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID
log.WithContext(ctx).Debugf("SyncAndMarkPeer: took %v", time.Since(start))
}()
accountUnlock := am.Store.AcquireReadLockByUID(ctx, accountID)
defer accountUnlock()
peerUnlock := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
defer peerUnlock()
peer, netMap, postureChecks, err := am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta}, accountID)
if err != nil {
return nil, nil, nil, fmt.Errorf("error syncing peer: %w", err)
@@ -1661,18 +1653,12 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID
}
func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string) error {
accountUnlock := am.Store.AcquireReadLockByUID(ctx, accountID)
defer accountUnlock()
peerUnlock := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
defer peerUnlock()
err := am.MarkPeerConnected(ctx, peerPubKey, false, nil, accountID)
if err != nil {
log.WithContext(ctx).Warnf("failed marking peer as disconnected %s %v", peerPubKey, err)
}
return nil
}
func (am *DefaultAccountManager) SyncPeerMeta(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta) error {
@@ -1681,12 +1667,6 @@ func (am *DefaultAccountManager) SyncPeerMeta(ctx context.Context, peerPubKey st
return err
}
unlock := am.Store.AcquireReadLockByUID(ctx, accountID)
defer unlock()
unlockPeer := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
defer unlockPeer()
_, _, _, err = am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta, UpdateAccountPeers: true}, accountID)
if err != nil {
return mapError(ctx, err)

View File

@@ -77,7 +77,7 @@ type Manager interface {
DeletePolicy(ctx context.Context, accountID, policyID, userID string) error
ListPolicies(ctx context.Context, accountID, userID string) ([]*types.Policy, error)
GetRoute(ctx context.Context, accountID string, routeID route.ID, userID string) (*route.Route, error)
CreateRoute(ctx context.Context, accountID string, prefix netip.Prefix, networkType route.NetworkType, domains domain.List, peerID string, peerGroupIDs []string, description string, netID route.NetID, masquerade bool, metric int, groups, accessControlGroupIDs []string, enabled bool, userID string, keepRoute bool) (*route.Route, error)
CreateRoute(ctx context.Context, accountID string, prefix netip.Prefix, networkType route.NetworkType, domains domain.List, peerID string, peerGroupIDs []string, description string, netID route.NetID, masquerade bool, metric int, groups, accessControlGroupIDs []string, enabled bool, userID string, keepRoute bool, skipAutoApply bool) (*route.Route, error)
SaveRoute(ctx context.Context, accountID, userID string, route *route.Route) error
DeleteRoute(ctx context.Context, accountID string, routeID route.ID, userID string) error
ListRoutes(ctx context.Context, accountID, userID string) ([]*route.Route, error)
@@ -123,7 +123,4 @@ type Manager interface {
UpdateToPrimaryAccount(ctx context.Context, accountId string) error
GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error)
GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error)
CreatePeerJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error
GetAllPeerJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error)
GetPeerJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error)
}

View File

@@ -2891,7 +2891,7 @@ func createManager(t testing.TB) (*DefaultAccountManager, error) {
permissionsManager := permissions.NewManager(store)
manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(nil), NewJobManager(nil, store), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
if err != nil {
return nil, err
}

View File

@@ -178,8 +178,6 @@ const (
AccountNetworkRangeUpdated Activity = 87
PeerIPUpdated Activity = 88
JobCreatedByUser Activity = 89
AccountDeleted Activity = 99999
)
@@ -286,8 +284,6 @@ var activityMap = map[Activity]Code{
AccountNetworkRangeUpdated: {"Account network range updated", "account.network.range.update"},
PeerIPUpdated: {"Peer IP updated", "peer.ip.update"},
JobCreatedByUser: {"Create Job for peer", "peer.job.create"},
}
// StringCode returns a string code of the activity

View File

@@ -219,7 +219,7 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
// return empty extra settings for expected calls to UpdateAccountPeers
settingsMockManager.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(&types.ExtraSettings{}, nil).AnyTimes()
permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), NewJobManager(nil, store), nil, "", "netbird.test", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
}
func createDNSStore(t *testing.T) (store.Store, error) {

View File

@@ -648,7 +648,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
_, err := manager.CreateRoute(
context.Background(), account.Id, newRoute.Network, newRoute.NetworkType, newRoute.Domains, newRoute.Peer,
newRoute.PeerGroups, newRoute.Description, newRoute.NetID, newRoute.Masquerade, newRoute.Metric,
newRoute.Groups, []string{}, true, userID, newRoute.KeepRoute,
newRoute.Groups, []string{}, true, userID, newRoute.KeepRoute, newRoute.SkipAutoApply,
)
require.NoError(t, err)

View File

@@ -2,9 +2,7 @@ package server
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/netip"
"strings"
@@ -47,7 +45,6 @@ type GRPCServer struct {
wgKey wgtypes.Key
proto.UnimplementedManagementServiceServer
peersUpdateManager *PeersUpdateManager
jobManager *JobManager
config *nbconfig.Config
secretsManager SecretsManager
appMetrics telemetry.AppMetrics
@@ -64,7 +61,6 @@ func NewServer(
accountManager account.Manager,
settingsManager settings.Manager,
peersUpdateManager *PeersUpdateManager,
jobManager *JobManager,
secretsManager SecretsManager,
appMetrics telemetry.AppMetrics,
ephemeralManager *EphemeralManager,
@@ -78,9 +74,10 @@ func NewServer(
if appMetrics != nil {
// update gauge based on number of connected peers which is equal to open gRPC streams
if err := appMetrics.GRPCMetrics().RegisterConnectedStreams(func() int64 {
err = appMetrics.GRPCMetrics().RegisterConnectedStreams(func() int64 {
return int64(len(peersUpdateManager.peerChannels))
}); err != nil {
})
if err != nil {
return nil, err
}
}
@@ -89,7 +86,6 @@ func NewServer(
wgKey: key,
// peerKey -> event channel
peersUpdateManager: peersUpdateManager,
jobManager: jobManager,
accountManager: accountManager,
settingsManager: settingsManager,
config: config,
@@ -136,46 +132,6 @@ func getRealIP(ctx context.Context) net.IP {
return nil
}
func (s *GRPCServer) Job(srv proto.ManagementService_JobServer) error {
reqStart := time.Now()
ctx := srv.Context()
peerKey, err := s.handleHandshake(ctx, srv)
if err != nil {
return err
}
accountID, err := s.accountManager.GetAccountIDForPeerKey(ctx, peerKey.String())
if err != nil {
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.AccountIDKey, "UNKNOWN")
log.WithContext(ctx).Tracef("peer %s is not registered", peerKey.String())
if errStatus, ok := internalStatus.FromError(err); ok && errStatus.Type() == internalStatus.NotFound {
return status.Errorf(codes.PermissionDenied, "peer is not registered")
}
return err
}
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
peer, err := s.accountManager.GetStore().GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerKey.String())
if err != nil {
return status.Errorf(codes.Unauthenticated, "peer is not registered")
}
// Start background response handler
s.startResponseReceiver(ctx, srv)
// Prepare per-peer state
updates, err := s.jobManager.CreateJobChannel(ctx, accountID, peer.ID)
if err != nil {
return status.Errorf(codes.Internal, err.Error())
}
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
// Main loop: forward jobs to client
return s.sendJobsLoop(ctx, accountID, peerKey, peer, updates, srv)
}
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
// notifies the connected peer of any updates (e.g. new peers under the same account)
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
@@ -191,6 +147,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
if err != nil {
return err
}
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.PeerIDKey, peerKey.String())
@@ -214,6 +171,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
realIP := getRealIP(ctx)
log.WithContext(ctx).Debugf("Sync request from peer [%s] [%s]", req.WgPubKey, realIP.String())
@@ -233,9 +191,10 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
return err
}
// Prepare per-peer state
updates := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
s.ephemeralManager.OnPeerConnected(ctx, peer)
s.secretsManager.SetupRefresh(ctx, accountID, peer.ID)
if s.appMetrics != nil {
@@ -250,76 +209,6 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv)
}
func (s *GRPCServer) handleHandshake(ctx context.Context, srv proto.ManagementService_JobServer) (wgtypes.Key, error) {
hello, err := srv.Recv()
if err != nil {
return wgtypes.Key{}, status.Errorf(codes.InvalidArgument, "missing hello: %v", err)
}
jobReq := &proto.JobRequest{}
peerKey, err := s.parseRequest(ctx, hello, jobReq)
if err != nil {
return wgtypes.Key{}, err
}
return peerKey, nil
}
func (s *GRPCServer) startResponseReceiver(ctx context.Context, srv proto.ManagementService_JobServer) {
go func() {
for {
msg, err := srv.Recv()
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
return
}
log.WithContext(ctx).Warnf("recv job response error: %v", err)
return
}
jobResp := &proto.JobResponse{}
if _, err := s.parseRequest(ctx, msg, jobResp); err != nil {
log.WithContext(ctx).Warnf("invalid job response: %v", err)
continue
}
if err := s.jobManager.HandleResponse(ctx, jobResp); err != nil {
log.WithContext(ctx).Errorf("handle job response failed: %v", err)
}
}
}()
}
func (s *GRPCServer) sendJobsLoop(
ctx context.Context,
accountID string,
peerKey wgtypes.Key,
peer *nbpeer.Peer,
updates <-chan *JobEvent,
srv proto.ManagementService_JobServer,
) error {
for {
select {
case job, open := <-updates:
if !open {
log.WithContext(ctx).Debugf("jobs channel for peer %s was closed", peerKey.String())
s.jobManager.CloseChannel(ctx, accountID, peer.ID)
return nil
}
if err := s.sendJob(ctx, accountID, peerKey, peer, job, srv); err != nil {
s.jobManager.CloseChannel(ctx, accountID, peer.ID)
log.WithContext(ctx).Warnf("send job failed: %v", err)
}
case <-ctx.Done():
// happens when connection drops, e.g. client disconnects
log.WithContext(ctx).Debugf("stream of peer %s has been closed", peerKey.String())
s.jobManager.CloseChannel(ctx, accountID, peer.ID)
return ctx.Err()
}
}
}
// handleUpdates sends updates to the connected peer until the updates channel is closed.
func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *UpdateMessage, srv proto.ManagementService_SyncServer) error {
log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String())
@@ -337,6 +226,7 @@ func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKe
return nil
}
log.WithContext(ctx).Debugf("received an update for peer %s", peerKey.String())
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
return err
}
@@ -359,7 +249,7 @@ func (s *GRPCServer) sendUpdate(ctx context.Context, accountID string, peerKey w
s.cancelPeerRoutines(ctx, accountID, peer)
return status.Errorf(codes.Internal, "failed processing update message")
}
err = srv.Send(&proto.EncryptedMessage{
err = srv.SendMsg(&proto.EncryptedMessage{
WgPubKey: s.wgKey.PublicKey().String(),
Body: encryptedResp,
})
@@ -371,27 +261,6 @@ func (s *GRPCServer) sendUpdate(ctx context.Context, accountID string, peerKey w
return nil
}
// sendJob encrypts the update message using the peer key and the server's wireguard key,
// then sends the encrypted message to the connected peer via the sync server.
func (s *GRPCServer) sendJob(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, job *JobEvent, srv proto.ManagementService_JobServer) error {
encryptedResp, err := encryption.EncryptMessage(peerKey, s.wgKey, job.Request)
if err != nil {
log.WithContext(ctx).Errorf("failed to encrypt job for peer %s: %v", peerKey.String(), err)
s.jobManager.CloseChannel(ctx, accountID, peer.ID)
return nil
}
err = srv.Send(&proto.EncryptedMessage{
WgPubKey: s.wgKey.PublicKey().String(),
Body: encryptedResp,
})
if err != nil {
s.jobManager.CloseChannel(ctx, accountID, peer.ID)
return status.Errorf(codes.Internal, "failed sending job message")
}
log.WithContext(ctx).Debugf("sent an job to peer %s", peerKey.String())
return nil
}
func (s *GRPCServer) cancelPeerRoutines(ctx context.Context, accountID string, peer *nbpeer.Peer) {
unlock := s.acquirePeerLockByUID(ctx, peer.Key)
defer unlock()
@@ -860,8 +729,8 @@ func (s *GRPCServer) IsHealthy(ctx context.Context, req *proto.Empty) (*proto.Em
// sendInitialSync sends initial proto.SyncResponse to the peer requesting synchronization
func (s *GRPCServer) sendInitialSync(ctx context.Context, peerKey wgtypes.Key, peer *nbpeer.Peer, networkMap *types.NetworkMap, postureChecks []*posture.Checks, srv proto.ManagementService_SyncServer) error {
var err error
var turnToken *Token
var turnToken *Token
if s.config.TURNConfig != nil && s.config.TURNConfig.TimeBasedCredentials {
turnToken, err = s.secretsManager.GenerateTurnToken()
if err != nil {

View File

@@ -14,11 +14,11 @@ import (
"github.com/netbirdio/netbird/management/server/activity"
nbcontext "github.com/netbirdio/netbird/management/server/context"
"github.com/netbirdio/netbird/management/server/groups"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/shared/management/http/util"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/shared/management/status"
"github.com/netbirdio/netbird/management/server/types"
)
// Handler is a handler that returns peers of the account
@@ -32,10 +32,6 @@ func AddEndpoints(accountManager account.Manager, router *mux.Router) {
router.HandleFunc("/peers/{peerId}", peersHandler.HandlePeer).
Methods("GET", "PUT", "DELETE", "OPTIONS")
router.HandleFunc("/peers/{peerId}/accessible-peers", peersHandler.GetAccessiblePeers).Methods("GET", "OPTIONS")
router.HandleFunc("/peers/{peerId}/jobs", peersHandler.ListJobs).Methods("GET", "OPTIONS")
router.HandleFunc("/peers/{peerId}/jobs", peersHandler.CreateJob).Methods("POST", "OPTIONS")
router.HandleFunc("/peers/{peerId}/jobs/{jobId}", peersHandler.GetJob).Methods("GET", "OPTIONS")
}
// NewHandler creates a new peers Handler
@@ -45,99 +41,6 @@ func NewHandler(accountManager account.Manager) *Handler {
}
}
func (h *Handler) CreateJob(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userAuth, err := nbcontext.GetUserAuthFromContext(ctx)
if err != nil {
util.WriteError(ctx, err, w)
return
}
vars := mux.Vars(r)
peerID := vars["peerId"]
req := &api.JobRequest{}
if err := json.NewDecoder(r.Body).Decode(req); err != nil {
util.WriteErrorResponse("couldn't parse JSON request", http.StatusBadRequest, w)
return
}
job, err := types.NewJob(userAuth.UserId, userAuth.AccountId, peerID, req)
if err != nil {
util.WriteError(ctx, err, w)
return
}
if err := h.accountManager.CreatePeerJob(ctx, userAuth.AccountId, peerID, userAuth.UserId, job); err != nil {
util.WriteError(ctx, err, w)
return
}
resp, err := toSingleJobResponse(job)
if err != nil {
util.WriteError(ctx, err, w)
return
}
util.WriteJSONObject(ctx, w, resp)
}
func (h *Handler) ListJobs(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userAuth, err := nbcontext.GetUserAuthFromContext(ctx)
if err != nil {
util.WriteError(ctx, err, w)
return
}
vars := mux.Vars(r)
peerID := vars["peerId"]
jobs, err := h.accountManager.GetAllPeerJobs(ctx, userAuth.AccountId, userAuth.UserId, peerID)
if err != nil {
util.WriteError(ctx, err, w)
return
}
respBody := make([]*api.JobResponse, 0, len(jobs))
for _, job := range jobs {
resp, err := toSingleJobResponse(job)
if err != nil {
util.WriteError(ctx, err, w)
return
}
respBody = append(respBody, resp)
}
util.WriteJSONObject(ctx, w, respBody)
}
func (h *Handler) GetJob(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userAuth, err := nbcontext.GetUserAuthFromContext(ctx)
if err != nil {
util.WriteError(ctx, err, w)
return
}
vars := mux.Vars(r)
peerID := vars["peerId"]
jobID := vars["jobId"]
job, err := h.accountManager.GetPeerJobByID(ctx, userAuth.AccountId, userAuth.UserId, peerID, jobID)
if err != nil {
util.WriteError(ctx, err, w)
return
}
resp, err := toSingleJobResponse(job)
if err != nil {
util.WriteError(ctx, err, w)
return
}
util.WriteJSONObject(ctx, w, resp)
}
func (h *Handler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) {
peerToReturn := peer.Copy()
if peer.Status.Connected {
@@ -451,7 +354,7 @@ func toSinglePeerResponse(peer *nbpeer.Peer, groupsInfo []api.GroupMinimum, dnsD
}
return &api.Peer{
CreatedAt: peer.CreatedAt,
CreatedAt: peer.CreatedAt,
Id: peer.ID,
Name: peer.Name,
Ip: peer.IP.String(),
@@ -488,7 +391,7 @@ func toPeerListItemResponse(peer *nbpeer.Peer, groupsInfo []api.GroupMinimum, dn
}
return &api.PeerBatch{
CreatedAt: peer.CreatedAt,
CreatedAt: peer.CreatedAt,
Id: peer.ID,
Name: peer.Name,
Ip: peer.IP.String(),
@@ -518,28 +421,6 @@ func toPeerListItemResponse(peer *nbpeer.Peer, groupsInfo []api.GroupMinimum, dn
}
}
func toSingleJobResponse(job *types.Job) (*api.JobResponse, error) {
workload, err := job.BuildWorkloadResponse()
if err != nil {
return nil, err
}
var failed *string
if job.FailedReason != "" {
failed = &job.FailedReason
}
return &api.JobResponse{
Id: job.ID,
CreatedAt: job.CreatedAt,
CompletedAt: job.CompletedAt,
TriggeredBy: job.TriggeredBy,
Status: api.JobResponseStatus(job.Status),
FailedReason: failed,
Workload: *workload,
}, nil
}
func fqdn(peer *nbpeer.Peer, dnsDomain string) string {
fqdn := peer.FQDN(dnsDomain)
if fqdn == "" {

View File

@@ -8,17 +8,19 @@ import (
"github.com/gorilla/mux"
"github.com/netbirdio/netbird/shared/management/domain"
"github.com/netbirdio/netbird/management/server/account"
nbcontext "github.com/netbirdio/netbird/management/server/context"
"github.com/netbirdio/netbird/route"
"github.com/netbirdio/netbird/shared/management/domain"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/shared/management/http/util"
"github.com/netbirdio/netbird/shared/management/status"
"github.com/netbirdio/netbird/route"
)
const failedToConvertRoute = "failed to convert route to response: %v"
const exitNodeCIDR = "0.0.0.0/0"
// handler is the routes handler of the account
type handler struct {
accountManager account.Manager
@@ -124,8 +126,16 @@ func (h *handler) createRoute(w http.ResponseWriter, r *http.Request) {
accessControlGroupIds = *req.AccessControlGroups
}
// Set default skipAutoApply value for exit nodes (0.0.0.0/0 routes)
skipAutoApply := false
if req.SkipAutoApply != nil {
skipAutoApply = *req.SkipAutoApply
} else if newPrefix.String() == exitNodeCIDR {
skipAutoApply = false
}
newRoute, err := h.accountManager.CreateRoute(r.Context(), accountID, newPrefix, networkType, domains, peerId, peerGroupIds,
req.Description, route.NetID(req.NetworkId), req.Masquerade, req.Metric, req.Groups, accessControlGroupIds, req.Enabled, userID, req.KeepRoute)
req.Description, route.NetID(req.NetworkId), req.Masquerade, req.Metric, req.Groups, accessControlGroupIds, req.Enabled, userID, req.KeepRoute, skipAutoApply)
if err != nil {
util.WriteError(r.Context(), err, w)
@@ -142,23 +152,31 @@ func (h *handler) createRoute(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) validateRoute(req api.PostApiRoutesJSONRequestBody) error {
if req.Network != nil && req.Domains != nil {
return h.validateRouteCommon(req.Network, req.Domains, req.Peer, req.PeerGroups, req.NetworkId)
}
func (h *handler) validateRouteUpdate(req api.PutApiRoutesRouteIdJSONRequestBody) error {
return h.validateRouteCommon(req.Network, req.Domains, req.Peer, req.PeerGroups, req.NetworkId)
}
func (h *handler) validateRouteCommon(network *string, domains *[]string, peer *string, peerGroups *[]string, networkId string) error {
if network != nil && domains != nil {
return status.Errorf(status.InvalidArgument, "only one of 'network' or 'domains' should be provided")
}
if req.Network == nil && req.Domains == nil {
if network == nil && domains == nil {
return status.Errorf(status.InvalidArgument, "either 'network' or 'domains' should be provided")
}
if req.Peer == nil && req.PeerGroups == nil {
if peer == nil && peerGroups == nil {
return status.Errorf(status.InvalidArgument, "either 'peer' or 'peer_groups' should be provided")
}
if req.Peer != nil && req.PeerGroups != nil {
if peer != nil && peerGroups != nil {
return status.Errorf(status.InvalidArgument, "only one of 'peer' or 'peer_groups' should be provided")
}
if utf8.RuneCountInString(req.NetworkId) > route.MaxNetIDChar || req.NetworkId == "" {
if utf8.RuneCountInString(networkId) > route.MaxNetIDChar || networkId == "" {
return status.Errorf(status.InvalidArgument, "identifier should be between 1 and %d characters",
route.MaxNetIDChar)
}
@@ -195,7 +213,7 @@ func (h *handler) updateRoute(w http.ResponseWriter, r *http.Request) {
return
}
if err := h.validateRoute(req); err != nil {
if err := h.validateRouteUpdate(req); err != nil {
util.WriteError(r.Context(), err, w)
return
}
@@ -205,15 +223,24 @@ func (h *handler) updateRoute(w http.ResponseWriter, r *http.Request) {
peerID = *req.Peer
}
// Set default skipAutoApply value for exit nodes (0.0.0.0/0 routes)
skipAutoApply := false
if req.SkipAutoApply != nil {
skipAutoApply = *req.SkipAutoApply
} else if req.Network != nil && *req.Network == exitNodeCIDR {
skipAutoApply = false
}
newRoute := &route.Route{
ID: route.ID(routeID),
NetID: route.NetID(req.NetworkId),
Masquerade: req.Masquerade,
Metric: req.Metric,
Description: req.Description,
Enabled: req.Enabled,
Groups: req.Groups,
KeepRoute: req.KeepRoute,
ID: route.ID(routeID),
NetID: route.NetID(req.NetworkId),
Masquerade: req.Masquerade,
Metric: req.Metric,
Description: req.Description,
Enabled: req.Enabled,
Groups: req.Groups,
KeepRoute: req.KeepRoute,
SkipAutoApply: skipAutoApply,
}
if req.Domains != nil {
@@ -321,18 +348,19 @@ func toRouteResponse(serverRoute *route.Route) (*api.Route, error) {
}
network := serverRoute.Network.String()
route := &api.Route{
Id: string(serverRoute.ID),
Description: serverRoute.Description,
NetworkId: string(serverRoute.NetID),
Enabled: serverRoute.Enabled,
Peer: &serverRoute.Peer,
Network: &network,
Domains: &domains,
NetworkType: serverRoute.NetworkType.String(),
Masquerade: serverRoute.Masquerade,
Metric: serverRoute.Metric,
Groups: serverRoute.Groups,
KeepRoute: serverRoute.KeepRoute,
Id: string(serverRoute.ID),
Description: serverRoute.Description,
NetworkId: string(serverRoute.NetID),
Enabled: serverRoute.Enabled,
Peer: &serverRoute.Peer,
Network: &network,
Domains: &domains,
NetworkType: serverRoute.NetworkType.String(),
Masquerade: serverRoute.Masquerade,
Metric: serverRoute.Metric,
Groups: serverRoute.Groups,
KeepRoute: serverRoute.KeepRoute,
SkipAutoApply: &serverRoute.SkipAutoApply,
}
if len(serverRoute.PeerGroups) > 0 {

View File

@@ -15,13 +15,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/netbirdio/netbird/shared/management/domain"
nbcontext "github.com/netbirdio/netbird/management/server/context"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/management/server/mock_server"
"github.com/netbirdio/netbird/shared/management/status"
"github.com/netbirdio/netbird/management/server/util"
"github.com/netbirdio/netbird/route"
"github.com/netbirdio/netbird/shared/management/domain"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/shared/management/status"
)
const (
@@ -62,21 +62,22 @@ func initRoutesTestData() *handler {
return &handler{
accountManager: &mock_server.MockAccountManager{
GetRouteFunc: func(_ context.Context, _ string, routeID route.ID, _ string) (*route.Route, error) {
if routeID == existingRouteID {
switch routeID {
case existingRouteID:
return baseExistingRoute, nil
}
if routeID == existingRouteID2 {
case existingRouteID2:
route := baseExistingRoute.Copy()
route.PeerGroups = []string{existingGroupID}
return route, nil
} else if routeID == existingRouteID3 {
case existingRouteID3:
route := baseExistingRoute.Copy()
route.Domains = domain.List{existingDomain}
return route, nil
default:
return nil, status.Errorf(status.NotFound, "route with ID %s not found", routeID)
}
return nil, status.Errorf(status.NotFound, "route with ID %s not found", routeID)
},
CreateRouteFunc: func(_ context.Context, accountID string, prefix netip.Prefix, networkType route.NetworkType, domains domain.List, peerID string, peerGroups []string, description string, netID route.NetID, masquerade bool, metric int, groups, accessControlGroups []string, enabled bool, _ string, keepRoute bool) (*route.Route, error) {
CreateRouteFunc: func(_ context.Context, accountID string, prefix netip.Prefix, networkType route.NetworkType, domains domain.List, peerID string, peerGroups []string, description string, netID route.NetID, masquerade bool, metric int, groups, accessControlGroups []string, enabled bool, _ string, keepRoute bool, skipAutoApply bool) (*route.Route, error) {
if peerID == notFoundPeerID {
return nil, status.Errorf(status.InvalidArgument, "peer with ID %s not found", peerID)
}
@@ -103,6 +104,7 @@ func initRoutesTestData() *handler {
Groups: groups,
KeepRoute: keepRoute,
AccessControlGroups: accessControlGroups,
SkipAutoApply: skipAutoApply,
}, nil
},
SaveRouteFunc: func(_ context.Context, _, _ string, r *route.Route) error {
@@ -190,19 +192,20 @@ func TestRoutesHandlers(t *testing.T) {
requestType: http.MethodPost,
requestPath: "/api/routes",
requestBody: bytes.NewBuffer(
[]byte(fmt.Sprintf(`{"Description":"Post","Network":"192.168.0.0/16","network_id":"awesomeNet","Peer":"%s","groups":["%s"]}`, existingPeerID, existingGroupID))),
[]byte(fmt.Sprintf(`{"Description":"Post","Network":"192.168.0.0/16","network_id":"awesomeNet","Peer":"%s","groups":["%s"],"skip_auto_apply":false}`, existingPeerID, existingGroupID))),
expectedStatus: http.StatusOK,
expectedBody: true,
expectedRoute: &api.Route{
Id: existingRouteID,
Description: "Post",
NetworkId: "awesomeNet",
Network: util.ToPtr("192.168.0.0/16"),
Peer: &existingPeerID,
NetworkType: route.IPv4NetworkString,
Masquerade: false,
Enabled: false,
Groups: []string{existingGroupID},
Id: existingRouteID,
Description: "Post",
NetworkId: "awesomeNet",
Network: util.ToPtr("192.168.0.0/16"),
Peer: &existingPeerID,
NetworkType: route.IPv4NetworkString,
Masquerade: false,
Enabled: false,
Groups: []string{existingGroupID},
SkipAutoApply: util.ToPtr(false),
},
},
{
@@ -210,21 +213,22 @@ func TestRoutesHandlers(t *testing.T) {
requestType: http.MethodPost,
requestPath: "/api/routes",
requestBody: bytes.NewBuffer(
[]byte(fmt.Sprintf(`{"description":"Post","domains":["example.com"],"network_id":"domainNet","peer":"%s","groups":["%s"],"keep_route":true}`, existingPeerID, existingGroupID))),
[]byte(fmt.Sprintf(`{"description":"Post","domains":["example.com"],"network_id":"domainNet","peer":"%s","groups":["%s"],"keep_route":true,"skip_auto_apply":false}`, existingPeerID, existingGroupID))),
expectedStatus: http.StatusOK,
expectedBody: true,
expectedRoute: &api.Route{
Id: existingRouteID,
Description: "Post",
NetworkId: "domainNet",
Network: util.ToPtr("invalid Prefix"),
KeepRoute: true,
Domains: &[]string{existingDomain},
Peer: &existingPeerID,
NetworkType: route.DomainNetworkString,
Masquerade: false,
Enabled: false,
Groups: []string{existingGroupID},
Id: existingRouteID,
Description: "Post",
NetworkId: "domainNet",
Network: util.ToPtr("invalid Prefix"),
KeepRoute: true,
Domains: &[]string{existingDomain},
Peer: &existingPeerID,
NetworkType: route.DomainNetworkString,
Masquerade: false,
Enabled: false,
Groups: []string{existingGroupID},
SkipAutoApply: util.ToPtr(false),
},
},
{
@@ -232,7 +236,7 @@ func TestRoutesHandlers(t *testing.T) {
requestType: http.MethodPost,
requestPath: "/api/routes",
requestBody: bytes.NewBuffer(
[]byte(fmt.Sprintf("{\"Description\":\"Post\",\"Network\":\"192.168.0.0/16\",\"network_id\":\"awesomeNet\",\"Peer\":\"%s\",\"groups\":[\"%s\"],\"access_control_groups\":[\"%s\"]}", existingPeerID, existingGroupID, existingGroupID))),
[]byte(fmt.Sprintf("{\"Description\":\"Post\",\"Network\":\"192.168.0.0/16\",\"network_id\":\"awesomeNet\",\"Peer\":\"%s\",\"groups\":[\"%s\"],\"access_control_groups\":[\"%s\"],\"skip_auto_apply\":false}", existingPeerID, existingGroupID, existingGroupID))),
expectedStatus: http.StatusOK,
expectedBody: true,
expectedRoute: &api.Route{
@@ -246,6 +250,7 @@ func TestRoutesHandlers(t *testing.T) {
Enabled: false,
Groups: []string{existingGroupID},
AccessControlGroups: &[]string{existingGroupID},
SkipAutoApply: util.ToPtr(false),
},
},
{
@@ -336,60 +341,63 @@ func TestRoutesHandlers(t *testing.T) {
name: "Network PUT OK",
requestType: http.MethodPut,
requestPath: "/api/routes/" + existingRouteID,
requestBody: bytes.NewBufferString(fmt.Sprintf("{\"Description\":\"Post\",\"Network\":\"192.168.0.0/16\",\"network_id\":\"awesomeNet\",\"Peer\":\"%s\",\"groups\":[\"%s\"]}", existingPeerID, existingGroupID)),
requestBody: bytes.NewBufferString(fmt.Sprintf("{\"Description\":\"Post\",\"Network\":\"192.168.0.0/16\",\"network_id\":\"awesomeNet\",\"Peer\":\"%s\",\"groups\":[\"%s\"],\"is_selected\":true}", existingPeerID, existingGroupID)),
expectedStatus: http.StatusOK,
expectedBody: true,
expectedRoute: &api.Route{
Id: existingRouteID,
Description: "Post",
NetworkId: "awesomeNet",
Network: util.ToPtr("192.168.0.0/16"),
Peer: &existingPeerID,
NetworkType: route.IPv4NetworkString,
Masquerade: false,
Enabled: false,
Groups: []string{existingGroupID},
Id: existingRouteID,
Description: "Post",
NetworkId: "awesomeNet",
Network: util.ToPtr("192.168.0.0/16"),
Peer: &existingPeerID,
NetworkType: route.IPv4NetworkString,
Masquerade: false,
Enabled: false,
Groups: []string{existingGroupID},
SkipAutoApply: util.ToPtr(false),
},
},
{
name: "Domains PUT OK",
requestType: http.MethodPut,
requestPath: "/api/routes/" + existingRouteID,
requestBody: bytes.NewBufferString(fmt.Sprintf(`{"Description":"Post","domains":["example.com"],"network_id":"awesomeNet","Peer":"%s","groups":["%s"],"keep_route":true}`, existingPeerID, existingGroupID)),
requestBody: bytes.NewBufferString(fmt.Sprintf(`{"Description":"Post","domains":["example.com"],"network_id":"awesomeNet","Peer":"%s","groups":["%s"],"keep_route":true,"skip_auto_apply":false}`, existingPeerID, existingGroupID)),
expectedStatus: http.StatusOK,
expectedBody: true,
expectedRoute: &api.Route{
Id: existingRouteID,
Description: "Post",
NetworkId: "awesomeNet",
Network: util.ToPtr("invalid Prefix"),
Domains: &[]string{existingDomain},
Peer: &existingPeerID,
NetworkType: route.DomainNetworkString,
Masquerade: false,
Enabled: false,
Groups: []string{existingGroupID},
KeepRoute: true,
Id: existingRouteID,
Description: "Post",
NetworkId: "awesomeNet",
Network: util.ToPtr("invalid Prefix"),
Domains: &[]string{existingDomain},
Peer: &existingPeerID,
NetworkType: route.DomainNetworkString,
Masquerade: false,
Enabled: false,
Groups: []string{existingGroupID},
KeepRoute: true,
SkipAutoApply: util.ToPtr(false),
},
},
{
name: "PUT OK when peer_groups provided",
requestType: http.MethodPut,
requestPath: "/api/routes/" + existingRouteID,
requestBody: bytes.NewBufferString(fmt.Sprintf("{\"Description\":\"Post\",\"Network\":\"192.168.0.0/16\",\"network_id\":\"awesomeNet\",\"peer_groups\":[\"%s\"],\"groups\":[\"%s\"]}", existingGroupID, existingGroupID)),
requestBody: bytes.NewBufferString(fmt.Sprintf("{\"Description\":\"Post\",\"Network\":\"192.168.0.0/16\",\"network_id\":\"awesomeNet\",\"peer_groups\":[\"%s\"],\"groups\":[\"%s\"],\"skip_auto_apply\":false}", existingGroupID, existingGroupID)),
expectedStatus: http.StatusOK,
expectedBody: true,
expectedRoute: &api.Route{
Id: existingRouteID,
Description: "Post",
NetworkId: "awesomeNet",
Network: util.ToPtr("192.168.0.0/16"),
Peer: &emptyString,
PeerGroups: &[]string{existingGroupID},
NetworkType: route.IPv4NetworkString,
Masquerade: false,
Enabled: false,
Groups: []string{existingGroupID},
Id: existingRouteID,
Description: "Post",
NetworkId: "awesomeNet",
Network: util.ToPtr("192.168.0.0/16"),
Peer: &emptyString,
PeerGroups: &[]string{existingGroupID},
NetworkType: route.IPv4NetworkString,
Masquerade: false,
Enabled: false,
Groups: []string{existingGroupID},
SkipAutoApply: util.ToPtr(false),
},
},
{

View File

@@ -119,7 +119,6 @@ func BuildApiBlackBoxWithDBState(t TB, sqlFile string, expectedPeerUpdate *serve
}
peersUpdateManager := server.NewPeersUpdateManager(nil)
jobManager := server.NewJobManager(nil, store)
updMsg := peersUpdateManager.CreateChannel(context.Background(), TestPeerId)
done := make(chan struct{})
if validateUpdate {
@@ -139,7 +138,7 @@ func BuildApiBlackBoxWithDBState(t TB, sqlFile string, expectedPeerUpdate *serve
userManager := users.NewManager(store)
permissionsManager := permissions.NewManager(store)
settingsManager := settings.NewManager(store, userManager, integrations.NewManager(&activity.InMemoryEventStore{}), permissionsManager)
am, err := server.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "", &activity.InMemoryEventStore{}, geoMock, false, validatorMock, metrics, proxyController, settingsManager, permissionsManager, false)
am, err := server.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "", &activity.InMemoryEventStore{}, geoMock, false, validatorMock, metrics, proxyController, settingsManager, permissionsManager, false)
if err != nil {
t.Fatalf("Failed to create manager: %v", err)
}

View File

@@ -1,223 +0,0 @@
package server
import (
"context"
"fmt"
"sync"
"time"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/proto"
log "github.com/sirupsen/logrus"
)
const jobChannelBuffer = 100
type JobEvent struct {
Job *types.Job
Request *proto.JobRequest
Response *proto.JobResponse
Done chan struct{} // closed when response arrives
StoreEvent func(meta map[string]any, peer *nbpeer.Peer)
}
type JobManager struct {
mu *sync.RWMutex
jobChannels map[string]chan *JobEvent // per-peer job streams
pending map[string]*JobEvent // jobID → event
responseWait time.Duration
metrics telemetry.AppMetrics
Store store.Store
}
func NewJobManager(metrics telemetry.AppMetrics, store store.Store) *JobManager {
return &JobManager{
jobChannels: make(map[string]chan *JobEvent),
pending: make(map[string]*JobEvent),
responseWait: 5 * time.Minute,
metrics: metrics,
mu: &sync.RWMutex{},
Store: store,
}
}
// CreateJobChannel creates or replaces a channel for a peer
func (jm *JobManager) CreateJobChannel(ctx context.Context, accountID, peerID string) (chan *JobEvent, error) {
// all pending jobs stored in db for this peer should be failed
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Pending job cleanup: marked as failed automatically due to being stuck too long"); err != nil {
return nil, err
}
jm.mu.Lock()
defer jm.mu.Unlock()
if ch, ok := jm.jobChannels[peerID]; ok {
close(ch)
delete(jm.jobChannels, peerID)
}
ch := make(chan *JobEvent, jobChannelBuffer)
jm.jobChannels[peerID] = ch
return ch, nil
}
// SendJob sends a job to a peer and tracks it as pending
func (jm *JobManager) SendJob(ctx context.Context, job *types.Job, storeEvent func(meta map[string]any, peer *nbpeer.Peer)) error {
jm.mu.RLock()
ch, ok := jm.jobChannels[job.PeerID]
jm.mu.RUnlock()
if !ok {
return fmt.Errorf("peer %s has no channel", job.PeerID)
}
req, err := job.ToStreamJobRequest()
if err != nil {
return err
}
event := &JobEvent{
Job: job,
Request: req,
Done: make(chan struct{}),
StoreEvent: storeEvent,
}
jm.mu.Lock()
jm.pending[string(req.ID)] = event
jm.mu.Unlock()
select {
case ch <- event:
case <-time.After(5 * time.Second):
jm.cleanup(ctx, string(req.ID), "timed out")
return fmt.Errorf("job channel full for peer %s", job.PeerID)
}
select {
case <-event.Done:
return nil
case <-time.After(jm.responseWait):
jm.cleanup(ctx, string(req.ID), "timed out")
return fmt.Errorf("job %s timed out", req.ID)
case <-ctx.Done():
jm.cleanup(ctx, string(req.ID), ctx.Err().Error())
return ctx.Err()
}
}
// HandleResponse marks a job as finished and moves it to completed
func (jm *JobManager) HandleResponse(ctx context.Context, resp *proto.JobResponse) error {
jm.mu.Lock()
defer jm.mu.Unlock()
event, ok := jm.pending[string(resp.ID)]
if !ok {
return fmt.Errorf("job %s not found", resp.ID)
}
fmt.Printf("we got this %+v\n", resp)
//update or create the store for job response
err := jm.saveJob(ctx, event.Job, resp, event.StoreEvent)
if err == nil {
event.Response = resp
}
close(event.Done)
delete(jm.pending, string(resp.ID))
return err
}
// CloseChannel closes a peers channel and cleans up its jobs
func (jm *JobManager) CloseChannel(ctx context.Context, accountID, peerID string) {
jm.mu.Lock()
defer jm.mu.Unlock()
if ch, ok := jm.jobChannels[peerID]; ok {
close(ch)
jm.jobChannels[peerID] = nil
delete(jm.jobChannels, peerID)
}
for jobID, ev := range jm.pending {
if ev.Job.PeerID == peerID {
// if the client disconnect and there is pending job then marke it as failed
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Time out"); err != nil {
log.WithContext(ctx).Errorf(err.Error())
}
delete(jm.pending, jobID)
}
}
}
// cleanup removes a pending job safely
func (jm *JobManager) cleanup(ctx context.Context, jobID string, reason string) {
jm.mu.Lock()
defer jm.mu.Unlock()
if ev, ok := jm.pending[jobID]; ok {
close(ev.Done)
if err := jm.Store.MarkPendingJobsAsFailed(ctx, ev.Job.AccountID, ev.Job.PeerID, reason); err != nil {
log.WithContext(ctx).Errorf(err.Error())
}
delete(jm.pending, jobID)
}
}
func (jm *JobManager) IsPeerConnected(peerID string) bool {
jm.mu.RLock()
defer jm.mu.RUnlock()
_, ok := jm.jobChannels[peerID]
return ok
}
func (jm *JobManager) IsPeerHasPendingJobs(peerID string) bool {
jm.mu.RLock()
defer jm.mu.RUnlock()
for _, ev := range jm.pending {
if ev.Job.PeerID == peerID {
return true
}
}
return false
}
func (jm *JobManager) saveJob(ctx context.Context, job *types.Job, response *proto.JobResponse, StoreEvent func(meta map[string]any, peer *nbpeer.Peer)) error {
var peer *nbpeer.Peer
var err error
var eventsToStore func()
// persist job in DB only if send succeeded
err = jm.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
peer, err = transaction.GetPeerByID(ctx, store.LockingStrengthUpdate, job.AccountID, job.PeerID)
if err != nil {
return err
}
if err := transaction.CreateOrUpdatePeerJob(ctx, job, response); err != nil {
return err
}
jobMeta := map[string]any{
"job_id": job.ID,
"for_peer_id": job.PeerID,
"job_type": job.Workload.Type,
"job_status": job.Status,
"job_workload": job.Workload,
}
eventsToStore = func() {
StoreEvent(jobMeta, peer)
}
return nil
})
if err != nil {
return err
}
eventsToStore()
return nil
}

View File

@@ -427,7 +427,6 @@ func startManagementForTest(t *testing.T, testFile string, config *config.Config
}
peersUpdateManager := NewPeersUpdateManager(nil)
jobManager := NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
ctx := context.WithValue(context.Background(), hook.ExecutionContextKey, hook.SystemSource) //nolint:staticcheck
@@ -451,7 +450,7 @@ func startManagementForTest(t *testing.T, testFile string, config *config.Config
permissionsManager := permissions.NewManager(store)
groupsManager := groups.NewManagerMock()
accountManager, err := BuildManager(ctx, store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted",
accountManager, err := BuildManager(ctx, store, peersUpdateManager, nil, "", "netbird.selfhosted",
eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
if err != nil {
@@ -462,7 +461,7 @@ func startManagementForTest(t *testing.T, testFile string, config *config.Config
secretsManager := NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
ephemeralMgr := NewEphemeralManager(store, accountManager)
mgmtServer, err := NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, ephemeralMgr, nil, MockIntegratedValidator{})
mgmtServer, err := NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, ephemeralMgr, nil, MockIntegratedValidator{})
if err != nil {
return nil, nil, "", cleanup, err
}

View File

@@ -176,7 +176,6 @@ func startServer(
}
peersUpdateManager := server.NewPeersUpdateManager(nil)
jobManager := server.NewJobManager(nil, str)
eventStore := &activity.InMemoryEventStore{}
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
@@ -203,7 +202,6 @@ func startServer(
context.Background(),
str,
peersUpdateManager,
jobManager,
nil,
"",
"netbird.selfhosted",
@@ -228,7 +226,6 @@ func startServer(
accountManager,
settingsMockManager,
peersUpdateManager,
jobManager,
secretsManager,
nil,
nil,

View File

@@ -61,7 +61,7 @@ type MockAccountManager struct {
UpdatePeerMetaFunc func(ctx context.Context, peerID string, meta nbpeer.PeerSystemMeta) error
UpdatePeerFunc func(ctx context.Context, accountID, userID string, peer *nbpeer.Peer) (*nbpeer.Peer, error)
UpdatePeerIPFunc func(ctx context.Context, accountID, userID, peerID string, newIP netip.Addr) error
CreateRouteFunc func(ctx context.Context, accountID string, prefix netip.Prefix, networkType route.NetworkType, domains domain.List, peer string, peerGroups []string, description string, netID route.NetID, masquerade bool, metric int, groups, accessControlGroupIDs []string, enabled bool, userID string, keepRoute bool) (*route.Route, error)
CreateRouteFunc func(ctx context.Context, accountID string, prefix netip.Prefix, networkType route.NetworkType, domains domain.List, peer string, peerGroups []string, description string, netID route.NetID, masquerade bool, metric int, groups, accessControlGroupIDs []string, enabled bool, userID string, keepRoute bool, isSelected bool) (*route.Route, error)
GetRouteFunc func(ctx context.Context, accountID string, routeID route.ID, userID string) (*route.Route, error)
SaveRouteFunc func(ctx context.Context, accountID string, userID string, route *route.Route) error
DeleteRouteFunc func(ctx context.Context, accountID string, routeID route.ID, userID string) error
@@ -123,32 +123,8 @@ type MockAccountManager struct {
GetOrCreateAccountByPrivateDomainFunc func(ctx context.Context, initiatorId, domain string) (*types.Account, bool, error)
UpdateAccountPeersFunc func(ctx context.Context, accountID string)
BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string)
CreatePeerJobFunc func(ctx context.Context, accountID, peerID, userID string, job *types.Job) error
GetAllPeerJobsFunc func(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error)
GetPeerJobByIDFunc func(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error)
}
func (am *MockAccountManager) CreatePeerJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error {
if am.CreatePeerJobFunc != nil {
return am.CreatePeerJobFunc(ctx, accountID, peerID, userID, job)
}
return status.Errorf(codes.Unimplemented, "method CreateJob is not implemented")
}
func (am *MockAccountManager) GetAllPeerJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) {
if am.CreatePeerJobFunc != nil {
return am.GetAllPeerJobsFunc(ctx, accountID, userID, peerID)
}
return nil, status.Errorf(codes.Unimplemented, "method GetAllJobs is not implemented")
}
func (am *MockAccountManager) GetPeerJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) {
if am.CreatePeerJobFunc != nil {
return am.GetPeerJobByIDFunc(ctx, accountID, userID, peerID, jobID)
}
return nil, status.Errorf(codes.Unimplemented, "method CreateJob is not implemented")
}
func (am *MockAccountManager) CreateGroup(ctx context.Context, accountID, userID string, group *types.Group) error {
if am.SaveGroupFunc != nil {
return am.SaveGroupFunc(ctx, accountID, userID, group, true)
@@ -516,9 +492,9 @@ func (am *MockAccountManager) UpdatePeerIP(ctx context.Context, accountID, userI
}
// CreateRoute mock implementation of CreateRoute from server.AccountManager interface
func (am *MockAccountManager) CreateRoute(ctx context.Context, accountID string, prefix netip.Prefix, networkType route.NetworkType, domains domain.List, peerID string, peerGroupIDs []string, description string, netID route.NetID, masquerade bool, metric int, groups, accessControlGroupID []string, enabled bool, userID string, keepRoute bool) (*route.Route, error) {
func (am *MockAccountManager) CreateRoute(ctx context.Context, accountID string, prefix netip.Prefix, networkType route.NetworkType, domains domain.List, peerID string, peerGroupIDs []string, description string, netID route.NetID, masquerade bool, metric int, groups, accessControlGroupID []string, enabled bool, userID string, keepRoute bool, isSelected bool) (*route.Route, error) {
if am.CreateRouteFunc != nil {
return am.CreateRouteFunc(ctx, accountID, prefix, networkType, domains, peerID, peerGroupIDs, description, netID, masquerade, metric, groups, accessControlGroupID, enabled, userID, keepRoute)
return am.CreateRouteFunc(ctx, accountID, prefix, networkType, domains, peerID, peerGroupIDs, description, netID, masquerade, metric, groups, accessControlGroupID, enabled, userID, keepRoute, isSelected)
}
return nil, status.Errorf(codes.Unimplemented, "method CreateRoute is not implemented")
}

View File

@@ -785,7 +785,7 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) {
AnyTimes()
permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), NewJobManager(nil, store), nil, "", "netbird.selfhosted", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
}
func createNSStore(t *testing.T) (store.Store, error) {

View File

@@ -333,98 +333,6 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
return peer, nil
}
func (am *DefaultAccountManager) CreatePeerJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error {
// todo: Create permissions for job
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
if err != nil {
return status.NewPermissionValidationError(err)
}
if !allowed {
return status.NewPermissionDeniedError()
}
peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID)
if err != nil {
return err
}
if peerAccountID != accountID {
return status.NewPeerNotPartOfAccountError()
}
// check if peer connected
if !am.jobManager.IsPeerConnected(peerID) {
return status.Errorf(status.BadRequest, "peer not connected")
}
// check if already has pending jobs
if am.jobManager.IsPeerHasPendingJobs(peerID) {
return status.Errorf(status.BadRequest, "peer already hase pending job")
}
// try sending job first
if err := am.jobManager.SendJob(ctx, job, func(meta map[string]any, peer *nbpeer.Peer) {
am.StoreEvent(ctx, userID, peer.ID, accountID, activity.JobCreatedByUser, meta)
}); err != nil {
return status.Errorf(status.Internal, "failed to send job: %v", err)
}
return nil
}
func (am *DefaultAccountManager) GetAllPeerJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) {
// todo: Create permissions for job
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
if err != nil {
return nil, status.NewPermissionValidationError(err)
}
if !allowed {
return nil, status.NewPermissionDeniedError()
}
peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID)
if err != nil {
return nil, err
}
if peerAccountID != accountID {
return []*types.Job{}, nil
}
accountJobs, err := am.Store.GetPeerJobs(accountID, peerID)
if err != nil {
return nil, err
}
return accountJobs, nil
}
func (am *DefaultAccountManager) GetPeerJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) {
// todo: Create permissions for job
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
if err != nil {
return nil, status.NewPermissionValidationError(err)
}
if !allowed {
return nil, status.NewPermissionDeniedError()
}
peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID)
if err != nil {
return nil, err
}
if peerAccountID != accountID {
return &types.Job{}, nil
}
job, err := am.Store.GetPeerJobByID(accountID, jobID)
if err != nil {
return nil, err
}
return job, nil
}
// DeletePeer removes peer from the account by its IP
func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peerID, userID string) error {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
@@ -701,13 +609,6 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
newPeer.DNSLabel = freeLabel
newPeer.IP = freeIP
unlock := am.Store.AcquireReadLockByUID(ctx, accountID)
defer func() {
if unlock != nil {
unlock()
}
}()
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
err = transaction.AddPeerToAccount(ctx, newPeer)
if err != nil {
@@ -759,14 +660,10 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
return nil
})
if err == nil {
unlock()
unlock = nil
break
}
if isUniqueConstraintError(err) {
unlock()
unlock = nil
log.WithContext(ctx).WithFields(log.Fields{"dns_label": freeLabel, "ip": freeIP}).Tracef("Failed to add peer in attempt %d, retrying: %v", attempt, err)
continue
}
@@ -925,15 +822,6 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
}
}
unlockAccount := am.Store.AcquireReadLockByUID(ctx, accountID)
defer unlockAccount()
unlockPeer := am.Store.AcquireWriteLockByUID(ctx, login.WireGuardPubKey)
defer func() {
if unlockPeer != nil {
unlockPeer()
}
}()
var peer *nbpeer.Peer
var updateRemotePeers bool
var isRequiresApproval bool
@@ -1014,9 +902,6 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
return nil, nil, nil, err
}
unlockPeer()
unlockPeer = nil
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
am.BufferUpdateAccountPeers(ctx, accountID)
}

View File

@@ -1274,7 +1274,7 @@ func Test_RegisterPeerByUser(t *testing.T) {
settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), NewJobManager(nil, s), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1354,7 +1354,7 @@ func Test_RegisterPeerBySetupKey(t *testing.T) {
AnyTimes()
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), NewJobManager(nil, s), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1502,7 +1502,7 @@ func Test_RegisterPeerRollbackOnFailure(t *testing.T) {
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), NewJobManager(nil, s), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1576,7 +1576,7 @@ func Test_LoginPeer(t *testing.T) {
AnyTimes()
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), NewJobManager(nil, s), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1973,7 +1973,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
_, err := manager.CreateRoute(
context.Background(), account.Id, route.Network, route.NetworkType, route.Domains, route.Peer,
route.PeerGroups, route.Description, route.NetID, route.Masquerade, route.Metric,
route.Groups, []string{}, true, userID, route.KeepRoute,
route.Groups, []string{}, true, userID, route.KeepRoute, route.SkipAutoApply,
)
require.NoError(t, err)

View File

@@ -134,7 +134,7 @@ func getRouteDescriptor(prefix netip.Prefix, domains domain.List) string {
}
// CreateRoute creates and saves a new route
func (am *DefaultAccountManager) CreateRoute(ctx context.Context, accountID string, prefix netip.Prefix, networkType route.NetworkType, domains domain.List, peerID string, peerGroupIDs []string, description string, netID route.NetID, masquerade bool, metric int, groups, accessControlGroupIDs []string, enabled bool, userID string, keepRoute bool) (*route.Route, error) {
func (am *DefaultAccountManager) CreateRoute(ctx context.Context, accountID string, prefix netip.Prefix, networkType route.NetworkType, domains domain.List, peerID string, peerGroupIDs []string, description string, netID route.NetID, masquerade bool, metric int, groups, accessControlGroupIDs []string, enabled bool, userID string, keepRoute bool, skipAutoApply bool) (*route.Route, error) {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()
@@ -170,6 +170,7 @@ func (am *DefaultAccountManager) CreateRoute(ctx context.Context, accountID stri
Enabled: enabled,
Groups: groups,
AccessControlGroups: accessControlGroupIDs,
SkipAutoApply: skipAutoApply,
}
if err = validateRoute(ctx, transaction, accountID, newRoute); err != nil {
@@ -382,15 +383,16 @@ func validateRouteGroups(ctx context.Context, transaction store.Store, accountID
func toProtocolRoute(route *route.Route) *proto.Route {
return &proto.Route{
ID: string(route.ID),
NetID: string(route.NetID),
Network: route.Network.String(),
Domains: route.Domains.ToPunycodeList(),
NetworkType: int64(route.NetworkType),
Peer: route.Peer,
Metric: int64(route.Metric),
Masquerade: route.Masquerade,
KeepRoute: route.KeepRoute,
ID: string(route.ID),
NetID: string(route.NetID),
Network: route.Network.String(),
Domains: route.Domains.ToPunycodeList(),
NetworkType: int64(route.NetworkType),
Peer: route.Peer,
Metric: int64(route.Metric),
Masquerade: route.Masquerade,
KeepRoute: route.KeepRoute,
SkipAutoApply: route.SkipAutoApply,
}
}

View File

@@ -69,6 +69,7 @@ func TestCreateRoute(t *testing.T) {
enabled bool
groups []string
accessControlGroups []string
skipAutoApply bool
}
testCases := []struct {
@@ -444,13 +445,13 @@ func TestCreateRoute(t *testing.T) {
if testCase.createInitRoute {
groupAll, errInit := account.GetGroupAll()
require.NoError(t, errInit)
_, errInit = am.CreateRoute(context.Background(), account.Id, existingNetwork, 1, nil, "", []string{routeGroup3, routeGroup4}, "", existingRouteID, false, 1000, []string{groupAll.ID}, []string{}, true, userID, false)
_, errInit = am.CreateRoute(context.Background(), account.Id, existingNetwork, 1, nil, "", []string{routeGroup3, routeGroup4}, "", existingRouteID, false, 1000, []string{groupAll.ID}, []string{}, true, userID, false, true)
require.NoError(t, errInit)
_, errInit = am.CreateRoute(context.Background(), account.Id, netip.Prefix{}, 3, existingDomains, "", []string{routeGroup3, routeGroup4}, "", existingRouteID, false, 1000, []string{groupAll.ID}, []string{groupAll.ID}, true, userID, false)
_, errInit = am.CreateRoute(context.Background(), account.Id, netip.Prefix{}, 3, existingDomains, "", []string{routeGroup3, routeGroup4}, "", existingRouteID, false, 1000, []string{groupAll.ID}, []string{groupAll.ID}, true, userID, false, true)
require.NoError(t, errInit)
}
outRoute, err := am.CreateRoute(context.Background(), account.Id, testCase.inputArgs.network, testCase.inputArgs.networkType, testCase.inputArgs.domains, testCase.inputArgs.peerKey, testCase.inputArgs.peerGroupIDs, testCase.inputArgs.description, testCase.inputArgs.netID, testCase.inputArgs.masquerade, testCase.inputArgs.metric, testCase.inputArgs.groups, testCase.inputArgs.accessControlGroups, testCase.inputArgs.enabled, userID, testCase.inputArgs.keepRoute)
outRoute, err := am.CreateRoute(context.Background(), account.Id, testCase.inputArgs.network, testCase.inputArgs.networkType, testCase.inputArgs.domains, testCase.inputArgs.peerKey, testCase.inputArgs.peerGroupIDs, testCase.inputArgs.description, testCase.inputArgs.netID, testCase.inputArgs.masquerade, testCase.inputArgs.metric, testCase.inputArgs.groups, testCase.inputArgs.accessControlGroups, testCase.inputArgs.enabled, userID, testCase.inputArgs.keepRoute, testCase.inputArgs.skipAutoApply)
testCase.errFunc(t, err)
@@ -1084,7 +1085,7 @@ func TestGetNetworkMap_RouteSyncPeerGroups(t *testing.T) {
require.NoError(t, err)
require.Len(t, newAccountRoutes.Routes, 0, "new accounts should have no routes")
newRoute, err := am.CreateRoute(context.Background(), account.Id, baseRoute.Network, baseRoute.NetworkType, baseRoute.Domains, baseRoute.Peer, baseRoute.PeerGroups, baseRoute.Description, baseRoute.NetID, baseRoute.Masquerade, baseRoute.Metric, baseRoute.Groups, baseRoute.AccessControlGroups, baseRoute.Enabled, userID, baseRoute.KeepRoute)
newRoute, err := am.CreateRoute(context.Background(), account.Id, baseRoute.Network, baseRoute.NetworkType, baseRoute.Domains, baseRoute.Peer, baseRoute.PeerGroups, baseRoute.Description, baseRoute.NetID, baseRoute.Masquerade, baseRoute.Metric, baseRoute.Groups, baseRoute.AccessControlGroups, baseRoute.Enabled, userID, baseRoute.KeepRoute, baseRoute.SkipAutoApply)
require.NoError(t, err)
require.Equal(t, newRoute.Enabled, true)
@@ -1176,7 +1177,7 @@ func TestGetNetworkMap_RouteSync(t *testing.T) {
require.NoError(t, err)
require.Len(t, newAccountRoutes.Routes, 0, "new accounts should have no routes")
createdRoute, err := am.CreateRoute(context.Background(), account.Id, baseRoute.Network, baseRoute.NetworkType, baseRoute.Domains, peer1ID, []string{}, baseRoute.Description, baseRoute.NetID, baseRoute.Masquerade, baseRoute.Metric, baseRoute.Groups, baseRoute.AccessControlGroups, false, userID, baseRoute.KeepRoute)
createdRoute, err := am.CreateRoute(context.Background(), account.Id, baseRoute.Network, baseRoute.NetworkType, baseRoute.Domains, peer1ID, []string{}, baseRoute.Description, baseRoute.NetID, baseRoute.Masquerade, baseRoute.Metric, baseRoute.Groups, baseRoute.AccessControlGroups, false, userID, baseRoute.KeepRoute, baseRoute.SkipAutoApply)
require.NoError(t, err)
noDisabledRoutes, err := am.GetNetworkMap(context.Background(), peer1ID)
@@ -1284,7 +1285,7 @@ func createRouterManager(t *testing.T) (*DefaultAccountManager, error) {
permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), NewJobManager(nil, store), nil, "", "netbird.selfhosted", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
}
func createRouterStore(t *testing.T) (store.Store, error) {
@@ -2004,7 +2005,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
_, err := manager.CreateRoute(
context.Background(), account.Id, route.Network, route.NetworkType, route.Domains, route.Peer,
route.PeerGroups, route.Description, route.NetID, route.Masquerade, route.Metric,
route.Groups, []string{}, true, userID, route.KeepRoute,
route.Groups, []string{}, true, userID, route.KeepRoute, route.SkipAutoApply,
)
require.NoError(t, err)
@@ -2040,7 +2041,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
_, err := manager.CreateRoute(
context.Background(), account.Id, route.Network, route.NetworkType, route.Domains, route.Peer,
route.PeerGroups, route.Description, route.NetID, route.Masquerade, route.Metric,
route.Groups, []string{}, true, userID, route.KeepRoute,
route.Groups, []string{}, true, userID, route.KeepRoute, route.SkipAutoApply,
)
require.NoError(t, err)
@@ -2076,7 +2077,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
newRoute, err := manager.CreateRoute(
context.Background(), account.Id, baseRoute.Network, baseRoute.NetworkType, baseRoute.Domains, baseRoute.Peer,
baseRoute.PeerGroups, baseRoute.Description, baseRoute.NetID, baseRoute.Masquerade, baseRoute.Metric,
baseRoute.Groups, []string{}, true, userID, baseRoute.KeepRoute,
baseRoute.Groups, []string{}, true, userID, baseRoute.KeepRoute, !baseRoute.SkipAutoApply,
)
require.NoError(t, err)
baseRoute = *newRoute
@@ -2142,7 +2143,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
_, err := manager.CreateRoute(
context.Background(), account.Id, newRoute.Network, newRoute.NetworkType, newRoute.Domains, newRoute.Peer,
newRoute.PeerGroups, newRoute.Description, newRoute.NetID, newRoute.Masquerade, newRoute.Metric,
newRoute.Groups, []string{}, true, userID, newRoute.KeepRoute,
newRoute.Groups, []string{}, true, userID, newRoute.KeepRoute, !newRoute.SkipAutoApply,
)
require.NoError(t, err)
@@ -2182,7 +2183,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
_, err := manager.CreateRoute(
context.Background(), account.Id, newRoute.Network, newRoute.NetworkType, newRoute.Domains, newRoute.Peer,
newRoute.PeerGroups, newRoute.Description, newRoute.NetID, newRoute.Masquerade, newRoute.Metric,
newRoute.Groups, []string{}, true, userID, newRoute.KeepRoute,
newRoute.Groups, []string{}, true, userID, newRoute.KeepRoute, !newRoute.SkipAutoApply,
)
require.NoError(t, err)

View File

@@ -34,20 +34,18 @@ import (
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/management/server/util"
"github.com/netbirdio/netbird/route"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/shared/management/status"
)
const (
storeSqliteFileName = "store.db"
idQueryCondition = "id = ?"
keyQueryCondition = "key = ?"
mysqlKeyQueryCondition = "`key` = ?"
accountAndIDQueryCondition = "account_id = ? and id = ?"
accountAndPeerIDQueryCondition = "account_id = ? and peer_id = ?"
accountAndIDsQueryCondition = "account_id = ? AND id IN ?"
accountIDCondition = "account_id = ?"
peerNotFoundFMT = "peer %s not found"
storeSqliteFileName = "store.db"
idQueryCondition = "id = ?"
keyQueryCondition = "key = ?"
mysqlKeyQueryCondition = "`key` = ?"
accountAndIDQueryCondition = "account_id = ? and id = ?"
accountAndIDsQueryCondition = "account_id = ? AND id IN ?"
accountIDCondition = "account_id = ?"
peerNotFoundFMT = "peer %s not found"
)
// SqlStore represents an account storage backed by a Sql DB persisted to disk
@@ -108,7 +106,6 @@ func NewSqlStore(ctx context.Context, db *gorm.DB, storeEngine types.Engine, met
&types.Account{}, &types.Policy{}, &types.PolicyRule{}, &route.Route{}, &nbdns.NameServerGroup{},
&installation{}, &types.ExtraSettings{}, &posture.Checks{}, &nbpeer.NetworkAddress{},
&networkTypes.Network{}, &routerTypes.NetworkRouter{}, &resourceTypes.NetworkResource{}, &types.AccountOnboarding{},
&types.Job{},
)
if err != nil {
return nil, fmt.Errorf("auto migratePreAuto: %w", err)
@@ -127,71 +124,6 @@ func GetKeyQueryCondition(s *SqlStore) string {
return keyQueryCondition
}
// SaveJob persists a job in DB
func (s *SqlStore) CreateOrUpdatePeerJob(ctx context.Context, job *types.Job, jobResponse *proto.JobResponse) error {
if err := job.ApplyResponse(jobResponse); err != nil {
return status.Errorf(status.Internal, err.Error())
}
fmt.Printf("new job or update %v\n", job)
result := s.db.
Model(&types.Job{}).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
DoUpdates: clause.AssignmentColumns([]string{"completed_at", "status", "failed_reason", "workload_workload_type", "workload_parameters", "workload_result"}),
}).Create(job)
if result.Error != nil {
log.WithContext(ctx).Errorf("failed to create job in store: %s", result.Error)
return status.Errorf(status.Internal, "failed to create job in store")
}
return nil
}
// job was pending for too long and has been cancelled
func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error {
now := time.Now().UTC()
result := s.db.
Model(&types.Job{}).
Where(accountAndPeerIDQueryCondition+"AND status = ?", accountID, peerID, types.JobStatusPending).
Updates(types.Job{
Status: types.JobStatusFailed,
FailedReason: reason,
CompletedAt: &now,
})
if result.Error != nil {
log.WithContext(ctx).Errorf("failed to mark pending jobs as Failed job in store: %s", result.Error)
return status.Errorf(status.Internal, "failed to mark pending job as Failed in store")
}
return nil
}
// GetJobByID fetches job by ID
func (s *SqlStore) GetPeerJobByID(accountID, jobID string) (*types.Job, error) {
var job types.Job
err := s.db.
Where(accountAndIDQueryCondition, accountID, jobID).
First(&job).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, status.Errorf(status.NotFound, "job %s not found", jobID)
}
return &job, err
}
// get all jobs
func (s *SqlStore) GetPeerJobs(accountID, peerID string) ([]*types.Job, error) {
var jobs []*types.Job
err := s.db.
Where(accountAndPeerIDQueryCondition, accountID, peerID).
Order("created_at DESC").
Find(&jobs).Error
if err != nil {
return nil, err
}
return jobs, nil
}
// AcquireGlobalLock acquires global lock across all the accounts and returns a function that releases the lock
func (s *SqlStore) AcquireGlobalLock(ctx context.Context) (unlock func()) {
log.WithContext(ctx).Tracef("acquiring global lock")
@@ -231,25 +163,6 @@ func (s *SqlStore) AcquireWriteLockByUID(ctx context.Context, uniqueID string) (
return unlock
}
// AcquireReadLockByUID acquires an ID lock for writing to a resource and returns a function that releases the lock
func (s *SqlStore) AcquireReadLockByUID(ctx context.Context, uniqueID string) (unlock func()) {
log.WithContext(ctx).Tracef("acquiring read lock for ID %s", uniqueID)
startWait := time.Now()
value, _ := s.resourceLocks.LoadOrStore(uniqueID, &sync.RWMutex{})
mtx := value.(*sync.RWMutex)
mtx.RLock()
log.WithContext(ctx).Tracef("waiting to acquire read lock for ID %s in %v", uniqueID, time.Since(startWait))
startHold := time.Now()
unlock = func() {
mtx.RUnlock()
log.WithContext(ctx).Tracef("released read lock for ID %s in %v", uniqueID, time.Since(startHold))
}
return unlock
}
// Deprecated: Full account operations are no longer supported
func (s *SqlStore) SaveAccount(ctx context.Context, account *types.Account) error {
start := time.Now()

View File

@@ -26,7 +26,6 @@ import (
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/testutil"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/util"
"github.com/netbirdio/netbird/management/server/migration"
@@ -171,8 +170,6 @@ type Store interface {
// AcquireWriteLockByUID should attempt to acquire a lock for write purposes and return a function that releases the lock
AcquireWriteLockByUID(ctx context.Context, uniqueID string) func()
// AcquireReadLockByUID should attempt to acquire lock for read purposes and return a function that releases the lock
AcquireReadLockByUID(ctx context.Context, uniqueID string) func()
// AcquireGlobalLock should attempt to acquire a global lock and return a function that releases the lock
AcquireGlobalLock(ctx context.Context) func()
@@ -206,10 +203,6 @@ type Store interface {
IsPrimaryAccount(ctx context.Context, accountID string) (bool, string, error)
MarkAccountPrimary(ctx context.Context, accountID string) error
UpdateAccountNetwork(ctx context.Context, accountID string, ipNet net.IPNet) error
GetPeerJobByID(accountID, jobID string) (*types.Job, error)
GetPeerJobs(accountID, peerID string) ([]*types.Job, error)
MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error
CreateOrUpdatePeerJob(ctx context.Context, job *types.Job, jobResponse *proto.JobResponse) error
}
const (

View File

@@ -1,216 +0,0 @@
package types
import (
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/shared/management/status"
)
type JobStatus string
const (
JobStatusPending JobStatus = "pending"
JobStatusSucceeded JobStatus = "succeeded"
JobStatusFailed JobStatus = "failed"
)
type JobType string
const (
JobTypeBundle JobType = "bundle"
)
type Job struct {
// ID is the primary identifier
ID string `gorm:"primaryKey"`
// CreatedAt when job was created (UTC)
CreatedAt time.Time `gorm:"autoCreateTime"`
// CompletedAt when job finished, null if still running
CompletedAt *time.Time
// TriggeredBy user that triggered this job
TriggeredBy string `gorm:"index"`
PeerID string `gorm:"index"`
AccountID string `gorm:"index"`
// Status of the job: pending, succeeded, failed
Status JobStatus `gorm:"index;type:varchar(50)"`
// FailedReason describes why the job failed (if failed)
FailedReason string
Workload Workload `gorm:"embedded;embeddedPrefix:workload_"`
}
type Workload struct {
Type JobType `gorm:"column:workload_type;index;type:varchar(50)"`
Parameters json.RawMessage `gorm:"type:json"`
Result json.RawMessage `gorm:"type:json"`
}
// NewJob creates a new job with default fields and validation
func NewJob(triggeredBy, accountID, peerID string, req *api.JobRequest) (*Job, error) {
if req == nil {
return nil, status.Errorf(status.BadRequest, "job request cannot be nil")
}
// Determine job type
jobTypeStr, err := req.Workload.Discriminator()
if err != nil {
return nil, status.Errorf(status.BadRequest, "could not determine job type: %v", err)
}
jobType := JobType(jobTypeStr)
if jobType == "" {
return nil, status.Errorf(status.BadRequest, "job type is required")
}
var workload Workload
switch jobType {
case JobTypeBundle:
if err := validateAndBuildBundleParams(req.Workload, &workload); err != nil {
return nil, status.Errorf(status.BadRequest, "%v", err)
}
default:
return nil, status.Errorf(status.BadRequest, "unsupported job type: %s", jobType)
}
return &Job{
ID: uuid.New().String(),
TriggeredBy: triggeredBy,
PeerID: peerID,
AccountID: accountID,
Status: JobStatusPending,
CreatedAt: time.Now().UTC(),
Workload: workload,
}, nil
}
func (j *Job) BuildWorkloadResponse() (*api.WorkloadResponse, error) {
var wl api.WorkloadResponse
switch j.Workload.Type {
case JobTypeBundle:
if err := j.buildBundleResponse(&wl); err != nil {
return nil, status.Errorf(status.InvalidArgument, err.Error())
}
return &wl, nil
default:
return nil, status.Errorf(status.InvalidArgument, "unknown job type: %v", j.Workload.Type)
}
}
func (j *Job) buildBundleResponse(wl *api.WorkloadResponse) error {
var p api.BundleParameters
if err := json.Unmarshal(j.Workload.Parameters, &p); err != nil {
return fmt.Errorf("invalid parameters for bundle job: %w", err)
}
var r api.BundleResult
if err := json.Unmarshal(j.Workload.Result, &r); err != nil {
return fmt.Errorf("invalid result for bundle job: %w", err)
}
if err := wl.FromBundleWorkloadResponse(api.BundleWorkloadResponse{
Type: api.WorkloadTypeBundle,
Parameters: p,
Result: r,
}); err != nil {
return fmt.Errorf("unknown job parameters: %v", err)
}
return nil
}
func validateAndBuildBundleParams(req api.WorkloadRequest, workload *Workload) error {
bundle, err := req.AsBundleWorkloadRequest()
if err != nil {
return fmt.Errorf("invalid parameters for bundle job")
}
// validate bundle_for_time <= 5 minutes
if bundle.Parameters.BundleForTime < 0 || bundle.Parameters.BundleForTime > 5 {
return fmt.Errorf("bundle_for_time must be between 0 and 5, got %d", bundle.Parameters.BundleForTime)
}
// validate log-file-count ≥ 1 and ≤ 1000
if bundle.Parameters.LogFileCount < 1 || bundle.Parameters.LogFileCount > 1000 {
return fmt.Errorf("log-file-count must be between 1 and 1000, got %d", bundle.Parameters.LogFileCount)
}
workload.Parameters, err = json.Marshal(bundle.Parameters)
if err != nil {
return fmt.Errorf("failed to marshal workload parameters: %w", err)
}
workload.Result = []byte("{}")
workload.Type = JobType(api.WorkloadTypeBundle)
return nil
}
// ApplyResponse validates and maps a proto.JobResponse into the Job fields.
func (j *Job) ApplyResponse(resp *proto.JobResponse) error {
if resp == nil {
return nil
}
now := time.Now().UTC()
j.CompletedAt = &now
switch resp.Status {
case proto.JobStatus_succeeded:
j.Status = JobStatusSucceeded
case proto.JobStatus_failed:
j.Status = JobStatusFailed
default:
j.Status = JobStatusPending
}
if len(resp.Reason) > 0 {
j.FailedReason = string(resp.Reason)
}
// Handle workload results (oneof)
var err error
switch r := resp.WorkloadResults.(type) {
case *proto.JobResponse_Bundle:
if j.Workload.Result, err = json.Marshal(r.Bundle); err != nil {
return fmt.Errorf("failed to marshal workload results: %w", err)
}
default:
return fmt.Errorf("unsupported workload response type: %T", r)
}
return nil
}
func (j *Job) ToStreamJobRequest() (*proto.JobRequest, error) {
switch j.Workload.Type {
case JobTypeBundle:
return j.buildStreamBundleResponse()
default:
return nil, status.Errorf(status.InvalidArgument, "unknown job type: %v", j.Workload.Type)
}
}
func (j *Job) buildStreamBundleResponse() (*proto.JobRequest, error) {
var p api.BundleParameters
if err := json.Unmarshal(j.Workload.Parameters, &p); err != nil {
return nil, fmt.Errorf("invalid parameters for bundle job: %w", err)
}
return &proto.JobRequest{
ID: []byte(j.ID),
WorkloadParameters: &proto.JobRequest_Bundle{
Bundle: &proto.BundleParameters{
BundleFor: p.BundleFor,
BundleForTime: int64(p.BundleForTime),
LogFileCount: int32(p.LogFileCount),
Anonymize: p.Anonymize,
},
},
}, nil
}

View File

@@ -107,6 +107,8 @@ type Route struct {
Enabled bool
Groups []string `gorm:"serializer:json"`
AccessControlGroups []string `gorm:"serializer:json"`
// SkipAutoApply indicates if this exit node route (0.0.0.0/0) should skip auto-application for client routing
SkipAutoApply bool
}
// EventMeta returns activity event meta related to the route
@@ -136,6 +138,7 @@ func (r *Route) Copy() *Route {
Enabled: r.Enabled,
Groups: slices.Clone(r.Groups),
AccessControlGroups: slices.Clone(r.AccessControlGroups),
SkipAutoApply: r.SkipAutoApply,
}
return route
}
@@ -162,7 +165,8 @@ func (r *Route) Equal(other *Route) bool {
other.Enabled == r.Enabled &&
slices.Equal(r.Groups, other.Groups) &&
slices.Equal(r.PeerGroups, other.PeerGroups) &&
slices.Equal(r.AccessControlGroups, other.AccessControlGroups)
slices.Equal(r.AccessControlGroups, other.AccessControlGroups) &&
other.SkipAutoApply == r.SkipAutoApply
}
// IsDynamic returns if the route is dynamic, i.e. has domains

View File

@@ -14,7 +14,6 @@ import (
type Client interface {
io.Closer
Sync(ctx context.Context, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error
Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error
GetServerPublicKey() (*wgtypes.Key, error)
Register(serverKey wgtypes.Key, setupKey string, jwtToken string, sysInfo *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
Login(serverKey wgtypes.Key, sysInfo *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)

View File

@@ -71,7 +71,6 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
t.Cleanup(cleanUp)
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
jobManager := mgmt.NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
@@ -109,7 +108,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
Return(true, nil).
AnyTimes()
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
if err != nil {
t.Fatal(err)
}
@@ -117,7 +116,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
groupsManager := groups.NewManagerMock()
secretsManager := mgmt.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
mgmtServer, err := mgmt.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, nil, nil, mgmt.MockIntegratedValidator{})
mgmtServer, err := mgmt.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, nil, nil, mgmt.MockIntegratedValidator{})
if err != nil {
t.Fatal(err)
}

View File

@@ -12,7 +12,6 @@ import (
gstatus "google.golang.org/grpc/status"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
@@ -112,25 +111,6 @@ func (c *GrpcClient) ready() bool {
// Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages
// Blocking request. The result will be sent via msgHandler callback function
func (c *GrpcClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error {
return c.withMgmtStream(ctx, func(ctx context.Context, serverPubKey wgtypes.Key) error {
return c.handleSyncStream(ctx, serverPubKey, sysInfo, msgHandler)
})
}
// Job wraps the real client's Job endpoint call and takes care of retries and encryption/decryption of messages
// Blocking request. The result will be sent via msgHandler callback function
func (c *GrpcClient) Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error {
return c.withMgmtStream(ctx, func(ctx context.Context, serverPubKey wgtypes.Key) error {
return c.handleJobStream(ctx, serverPubKey, msgHandler)
})
}
// withMgmtStream runs a streaming operation against the ManagementService
// It takes care of retries, connection readiness, and fetching server public key.
func (c *GrpcClient) withMgmtStream(
ctx context.Context,
handler func(ctx context.Context, serverPubKey wgtypes.Key) error,
) error {
operation := func() error {
log.Debugf("management connection state %v", c.conn.GetState())
connState := c.conn.GetState()
@@ -148,7 +128,7 @@ func (c *GrpcClient) withMgmtStream(
return err
}
return handler(ctx, *serverPubKey)
return c.handleStream(ctx, *serverPubKey, sysInfo, msgHandler)
}
err := backoff.Retry(operation, defaultBackoff(ctx))
@@ -159,132 +139,12 @@ func (c *GrpcClient) withMgmtStream(
return err
}
func (c *GrpcClient) handleJobStream(
ctx context.Context,
serverPubKey wgtypes.Key,
msgHandler func(msg *proto.JobRequest) *proto.JobResponse,
) error {
stream, err := c.realClient.Job(ctx)
if err != nil {
log.WithContext(ctx).Errorf("failed to open job stream: %v", err)
return err
}
// Handshake with the server
if err := c.sendHandshake(ctx, stream, serverPubKey); err != nil {
return err
}
log.WithContext(ctx).Debug("job stream handshake sent successfully")
// Main loop: receive, process, respond
for {
jobReq, err := c.receiveJobRequest(ctx, stream, serverPubKey)
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
log.WithContext(ctx).Info("job stream closed by server")
return nil
}
log.WithContext(ctx).Errorf("error receiving job request: %v", err)
return err
}
if jobReq == nil || len(jobReq.ID) == 0 {
log.WithContext(ctx).Debug("received unknown or empty job request, skipping")
continue
}
jobResp := c.processJobRequest(ctx, jobReq, msgHandler)
if err := c.sendJobResponse(ctx, stream, serverPubKey, jobResp); err != nil {
return err
}
}
}
// sendHandshake sends the initial handshake message
func (c *GrpcClient) sendHandshake(ctx context.Context, stream proto.ManagementService_JobClient, serverPubKey wgtypes.Key) error {
handshakeReq := &proto.JobRequest{
ID: []byte(uuid.New().String()),
}
encHello, err := encryption.EncryptMessage(serverPubKey, c.key, handshakeReq)
if err != nil {
log.WithContext(ctx).Errorf("failed to encrypt handshake message: %v", err)
return err
}
return stream.Send(&proto.EncryptedMessage{
WgPubKey: c.key.PublicKey().String(),
Body: encHello,
})
}
// receiveJobRequest waits for and decrypts a job request
func (c *GrpcClient) receiveJobRequest(
ctx context.Context,
stream proto.ManagementService_JobClient,
serverPubKey wgtypes.Key,
) (*proto.JobRequest, error) {
encryptedMsg, err := stream.Recv()
if err != nil {
return nil, err
}
jobReq := &proto.JobRequest{}
if err := encryption.DecryptMessage(serverPubKey, c.key, encryptedMsg.Body, jobReq); err != nil {
log.WithContext(ctx).Warnf("failed to decrypt job request: %v", err)
return nil, err
}
return jobReq, nil
}
// processJobRequest executes the handler and ensures a valid response
func (c *GrpcClient) processJobRequest(
ctx context.Context,
jobReq *proto.JobRequest,
msgHandler func(msg *proto.JobRequest) *proto.JobResponse,
) *proto.JobResponse {
jobResp := msgHandler(jobReq)
if jobResp == nil {
jobResp = &proto.JobResponse{
ID: jobReq.ID,
Status: proto.JobStatus_failed,
Reason: []byte("handler returned nil response"),
}
log.WithContext(ctx).Warnf("job handler returned nil for job %s", string(jobReq.ID))
}
return jobResp
}
// sendJobResponse encrypts and sends a job response
func (c *GrpcClient) sendJobResponse(
ctx context.Context,
stream proto.ManagementService_JobClient,
serverPubKey wgtypes.Key,
resp *proto.JobResponse,
) error {
encResp, err := encryption.EncryptMessage(serverPubKey, c.key, resp)
if err != nil {
log.WithContext(ctx).Errorf("failed to encrypt job response for job %s: %v", string(resp.ID), err)
return err
}
if err := stream.Send(&proto.EncryptedMessage{
WgPubKey: c.key.PublicKey().String(),
Body: encResp,
}); err != nil {
log.WithContext(ctx).Errorf("failed to send job response for job %s: %v", string(resp.ID), err)
return err
}
log.WithContext(ctx).Debugf("job response sent successfully for job %s", string(resp.ID))
return nil
}
func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error {
func (c *GrpcClient) handleStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info,
msgHandler func(msg *proto.SyncResponse) error) error {
ctx, cancelStream := context.WithCancel(ctx)
defer cancelStream()
stream, err := c.connectToSyncStream(ctx, serverPubKey, sysInfo)
stream, err := c.connectToStream(ctx, serverPubKey, sysInfo)
if err != nil {
log.Debugf("failed to open Management Service stream: %s", err)
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
@@ -297,7 +157,7 @@ func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes.
c.notifyConnected()
// blocking until error
err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler)
err = c.receiveEvents(stream, serverPubKey, msgHandler)
if err != nil {
c.notifyDisconnected(err)
s, _ := gstatus.FromError(err)
@@ -326,7 +186,7 @@ func (c *GrpcClient) GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, err
ctx, cancelStream := context.WithCancel(c.ctx)
defer cancelStream()
stream, err := c.connectToSyncStream(ctx, *serverPubKey, sysInfo)
stream, err := c.connectToStream(ctx, *serverPubKey, sysInfo)
if err != nil {
log.Debugf("failed to open Management Service stream: %s", err)
return nil, err
@@ -359,7 +219,7 @@ func (c *GrpcClient) GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, err
return decryptedResp.GetNetworkMap(), nil
}
func (c *GrpcClient) connectToSyncStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info) (proto.ManagementService_SyncClient, error) {
func (c *GrpcClient) connectToStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info) (proto.ManagementService_SyncClient, error) {
req := &proto.SyncRequest{Meta: infoToMetaData(sysInfo)}
myPrivateKey := c.key
@@ -378,7 +238,7 @@ func (c *GrpcClient) connectToSyncStream(ctx context.Context, serverPubKey wgtyp
return sync, nil
}
func (c *GrpcClient) receiveUpdatesEvents(stream proto.ManagementService_SyncClient, serverPubKey wgtypes.Key, msgHandler func(msg *proto.SyncResponse) error) error {
func (c *GrpcClient) receiveEvents(stream proto.ManagementService_SyncClient, serverPubKey wgtypes.Key, msgHandler func(msg *proto.SyncResponse) error) error {
for {
update, err := stream.Recv()
if err == io.EOF {

View File

@@ -20,7 +20,6 @@ type MockClient struct {
GetPKCEAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
SyncMetaFunc func(sysInfo *system.Info) error
LogoutFunc func() error
JobFunc func(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error
}
func (m *MockClient) IsHealthy() bool {
@@ -41,13 +40,6 @@ func (m *MockClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler
return m.SyncFunc(ctx, sysInfo, msgHandler)
}
func (m *MockClient) Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error {
if m.JobFunc == nil {
return nil
}
return m.JobFunc(ctx, msgHandler)
}
func (m *MockClient) GetServerPublicKey() (*wgtypes.Key, error) {
if m.GetServerPublicKeyFunc == nil {
return nil, nil

View File

@@ -11,6 +11,6 @@ fi
old_pwd=$(pwd)
script_path=$(dirname $(realpath "$0"))
cd "$script_path"
go install github.com/oapi-codegen/oapi-codegen/v2/cmd/oapi-codegen@latest
go install github.com/deepmap/oapi-codegen/cmd/oapi-codegen@4a1477f6a8ba6ca8115cc23bb2fb67f0b9fca18e
oapi-codegen --config cfg.yaml openapi.yml
cd "$old_pwd"
cd "$old_pwd"

View File

@@ -34,111 +34,6 @@ tags:
x-cloud-only: true
components:
schemas:
WorkloadType:
type: string
enum:
- bundle
BundleParameters:
type: object
properties:
bundle_for:
type: boolean
example: true
bundle_for_time:
type: integer
minimum: 0
example: 2
log_file_count:
type: integer
minimum: 0
example: 100
anonymize:
type: boolean
example: false
required:
- bundle_for
- bundle_for_time
- log_file_count
- anonymize
BundleResult:
type: object
properties:
upload_key:
type: string
example: "upload_key_123"
nullable: true
BundleWorkloadRequest:
type: object
properties:
type:
$ref: '#/components/schemas/WorkloadType'
parameters:
$ref: '#/components/schemas/BundleParameters'
required:
- type
- parameters
BundleWorkloadResponse:
type: object
properties:
type:
$ref: '#/components/schemas/WorkloadType'
parameters:
$ref: '#/components/schemas/BundleParameters'
result:
$ref: '#/components/schemas/BundleResult'
required:
- type
- parameters
- result
WorkloadRequest:
oneOf:
- $ref: '#/components/schemas/BundleWorkloadRequest'
discriminator:
propertyName: type
mapping:
bundle: '#/components/schemas/BundleWorkloadRequest'
WorkloadResponse:
oneOf:
- $ref: '#/components/schemas/BundleWorkloadResponse'
discriminator:
propertyName: type
mapping:
bundle: '#/components/schemas/BundleWorkloadResponse'
JobRequest:
type: object
properties:
workload:
$ref: '#/components/schemas/WorkloadRequest'
required:
- workload
JobResponse:
type: object
properties:
id:
type: string
created_at:
type: string
format: date-time
completed_at:
type: string
format: date-time
nullable: true
triggered_by:
type: string
status:
type: string
enum: [pending, succeeded, failed]
failed_reason:
type: string
nullable: true
workload:
$ref: '#/components/schemas/WorkloadResponse'
required:
- id
- created_at
- status
- triggered_by
- workload
Account:
type: object
properties:
@@ -1447,6 +1342,10 @@ components:
items:
type: string
example: "chacbco6lnnbn6cg5s91"
skip_auto_apply:
description: Indicate if this exit node route (0.0.0.0/0) should skip auto-application for client routing
type: boolean
example: false
required:
- id
- description
@@ -2275,108 +2174,6 @@ security:
- BearerAuth: [ ]
- TokenAuth: [ ]
paths:
/api/peers/{peerId}/jobs:
get:
summary: List Jobs
description: Retrieve all jobs for a given peer
tags: [ Jobs ]
security:
- BearerAuth: []
- TokenAuth: []
parameters:
- in: path
name: peerId
required: true
schema:
type: string
description: The unique identifier of a peer
responses:
'200':
description: List of jobs
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/JobResponse'
'400':
$ref: '#/components/responses/bad_request'
'401':
$ref: '#/components/responses/requires_authentication'
'403':
$ref: '#/components/responses/forbidden'
'500':
$ref: '#/components/responses/internal_error'
post:
summary: Create Job
description: Create a new job for a given peer
tags: [ Jobs ]
security:
- BearerAuth: []
- TokenAuth: []
parameters:
- in: path
name: peerId
required: true
schema:
type: string
description: The unique identifier of a peer
requestBody:
description: Create job request
content:
application/json:
schema:
$ref: '#/components/schemas/JobRequest'
required: true
responses:
'201':
description: Job created
content:
application/json:
schema:
$ref: '#/components/schemas/JobResponse'
'400':
"$ref": "#/components/responses/bad_request"
'401':
"$ref": "#/components/responses/requires_authentication"
'403':
"$ref": "#/components/responses/forbidden"
'500':
"$ref": "#/components/responses/internal_error"
/api/peers/{peerId}/jobs/{jobId}:
get:
summary: Get Job
description: Retrieve details of a specific job
tags: [ Jobs ]
security:
- BearerAuth: []
- TokenAuth: []
parameters:
- in: path
name: peerId
required: true
schema:
type: string
- in: path
name: jobId
required: true
schema:
type: string
responses:
'200':
description: A Job object
content:
application/json:
schema:
$ref: '#/components/schemas/JobResponse'
'400':
"$ref": "#/components/responses/bad_request"
'401':
"$ref": "#/components/responses/requires_authentication"
'403':
"$ref": "#/components/responses/forbidden"
'500':
"$ref": "#/components/responses/internal_error"
/api/accounts:
get:
summary: List all Accounts

View File

@@ -1,14 +1,10 @@
// Package api provides primitives to interact with the openapi HTTP API.
//
// Code generated by github.com/oapi-codegen/oapi-codegen/v2 version v2.5.0 DO NOT EDIT.
// Code generated by github.com/deepmap/oapi-codegen version v1.11.1-0.20220912230023-4a1477f6a8ba DO NOT EDIT.
package api
import (
"encoding/json"
"errors"
"time"
"github.com/oapi-codegen/runtime"
)
const (
@@ -108,13 +104,6 @@ const (
IngressPortAllocationRequestPortRangeProtocolUdp IngressPortAllocationRequestPortRangeProtocol = "udp"
)
// Defines values for JobResponseStatus.
const (
JobResponseStatusFailed JobResponseStatus = "failed"
JobResponseStatusPending JobResponseStatus = "pending"
JobResponseStatusSucceeded JobResponseStatus = "succeeded"
)
// Defines values for NameserverNsType.
const (
NameserverNsTypeUdp NameserverNsType = "udp"
@@ -189,11 +178,6 @@ const (
UserStatusInvited UserStatus = "invited"
)
// Defines values for WorkloadType.
const (
WorkloadTypeBundle WorkloadType = "bundle"
)
// Defines values for GetApiEventsNetworkTrafficParamsType.
const (
GetApiEventsNetworkTrafficParamsTypeTYPEDROP GetApiEventsNetworkTrafficParamsType = "TYPE_DROP"
@@ -353,32 +337,6 @@ type AvailablePorts struct {
Udp int `json:"udp"`
}
// BundleParameters defines model for BundleParameters.
type BundleParameters struct {
Anonymize bool `json:"anonymize"`
BundleFor bool `json:"bundle_for"`
BundleForTime int `json:"bundle_for_time"`
LogFileCount int `json:"log_file_count"`
}
// BundleResult defines model for BundleResult.
type BundleResult struct {
UploadKey *string `json:"upload_key"`
}
// BundleWorkloadRequest defines model for BundleWorkloadRequest.
type BundleWorkloadRequest struct {
Parameters BundleParameters `json:"parameters"`
Type WorkloadType `json:"type"`
}
// BundleWorkloadResponse defines model for BundleWorkloadResponse.
type BundleWorkloadResponse struct {
Parameters BundleParameters `json:"parameters"`
Result BundleResult `json:"result"`
Type WorkloadType `json:"type"`
}
// Checks List of objects that perform the actual checks
type Checks struct {
// GeoLocationCheck Posture check for geo location
@@ -685,25 +643,6 @@ type IngressPortAllocationRequestPortRange struct {
// IngressPortAllocationRequestPortRangeProtocol The protocol accepted by the port range
type IngressPortAllocationRequestPortRangeProtocol string
// JobRequest defines model for JobRequest.
type JobRequest struct {
Workload WorkloadRequest `json:"workload"`
}
// JobResponse defines model for JobResponse.
type JobResponse struct {
CompletedAt *time.Time `json:"completed_at"`
CreatedAt time.Time `json:"created_at"`
FailedReason *string `json:"failed_reason"`
Id string `json:"id"`
Status JobResponseStatus `json:"status"`
TriggeredBy string `json:"triggered_by"`
Workload WorkloadResponse `json:"workload"`
}
// JobResponseStatus defines model for JobResponse.Status.
type JobResponseStatus string
// Location Describe geographical location information
type Location struct {
// CityName Commonly used English name of the city
@@ -1076,6 +1015,8 @@ type OSVersionCheck struct {
// Peer defines model for Peer.
type Peer struct {
// CreatedAt Peer creation date (UTC)
CreatedAt time.Time `json:"created_at"`
// ApprovalRequired (Cloud only) Indicates whether peer needs approval
ApprovalRequired bool `json:"approval_required"`
@@ -1091,9 +1032,6 @@ type Peer struct {
// CountryCode 2-letter ISO 3166-1 alpha-2 code that represents the country
CountryCode CountryCode `json:"country_code"`
// CreatedAt Peer creation date (UTC)
CreatedAt time.Time `json:"created_at"`
// DnsLabel Peer's DNS label is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's domain to the peer label. e.g. peer-dns-label.netbird.cloud
DnsLabel string `json:"dns_label"`
@@ -1160,6 +1098,8 @@ type Peer struct {
// PeerBatch defines model for PeerBatch.
type PeerBatch struct {
// CreatedAt Peer creation date (UTC)
CreatedAt time.Time `json:"created_at"`
// AccessiblePeersCount Number of accessible peers
AccessiblePeersCount int `json:"accessible_peers_count"`
@@ -1178,9 +1118,6 @@ type PeerBatch struct {
// CountryCode 2-letter ISO 3166-1 alpha-2 code that represents the country
CountryCode CountryCode `json:"country_code"`
// CreatedAt Peer creation date (UTC)
CreatedAt time.Time `json:"created_at"`
// DnsLabel Peer's DNS label is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's domain to the peer label. e.g. peer-dns-label.netbird.cloud
DnsLabel string `json:"dns_label"`
@@ -1604,6 +1541,9 @@ type Route struct {
// PeerGroups Peers Group Identifier associated with route. This property can not be set together with `peer`
PeerGroups *[]string `json:"peer_groups,omitempty"`
// SkipAutoApply Indicate if this exit node route (0.0.0.0/0) should skip auto-application for client routing
SkipAutoApply *bool `json:"skip_auto_apply,omitempty"`
}
// RouteRequest defines model for RouteRequest.
@@ -1643,6 +1583,9 @@ type RouteRequest struct {
// PeerGroups Peers Group Identifier associated with route. This property can not be set together with `peer`
PeerGroups *[]string `json:"peer_groups,omitempty"`
// SkipAutoApply Indicate if this exit node route (0.0.0.0/0) should skip auto-application for client routing
SkipAutoApply *bool `json:"skip_auto_apply,omitempty"`
}
// RulePortRange Policy rule affected ports range
@@ -1881,19 +1824,6 @@ type UserRequest struct {
Role string `json:"role"`
}
// WorkloadRequest defines model for WorkloadRequest.
type WorkloadRequest struct {
union json.RawMessage
}
// WorkloadResponse defines model for WorkloadResponse.
type WorkloadResponse struct {
union json.RawMessage
}
// WorkloadType defines model for WorkloadType.
type WorkloadType string
// GetApiEventsNetworkTrafficParams defines parameters for GetApiEventsNetworkTraffic.
type GetApiEventsNetworkTrafficParams struct {
// Page Page number
@@ -2011,9 +1941,6 @@ type PostApiPeersPeerIdIngressPortsJSONRequestBody = IngressPortAllocationReques
// PutApiPeersPeerIdIngressPortsAllocationIdJSONRequestBody defines body for PutApiPeersPeerIdIngressPortsAllocationId for application/json ContentType.
type PutApiPeersPeerIdIngressPortsAllocationIdJSONRequestBody = IngressPortAllocationRequest
// PostApiPeersPeerIdJobsJSONRequestBody defines body for PostApiPeersPeerIdJobs for application/json ContentType.
type PostApiPeersPeerIdJobsJSONRequestBody = JobRequest
// PostApiPoliciesJSONRequestBody defines body for PostApiPolicies for application/json ContentType.
type PostApiPoliciesJSONRequestBody = PolicyUpdate
@@ -2046,121 +1973,3 @@ type PutApiUsersUserIdJSONRequestBody = UserRequest
// PostApiUsersUserIdTokensJSONRequestBody defines body for PostApiUsersUserIdTokens for application/json ContentType.
type PostApiUsersUserIdTokensJSONRequestBody = PersonalAccessTokenRequest
// AsBundleWorkloadRequest returns the union data inside the WorkloadRequest as a BundleWorkloadRequest
func (t WorkloadRequest) AsBundleWorkloadRequest() (BundleWorkloadRequest, error) {
var body BundleWorkloadRequest
err := json.Unmarshal(t.union, &body)
return body, err
}
// FromBundleWorkloadRequest overwrites any union data inside the WorkloadRequest as the provided BundleWorkloadRequest
func (t *WorkloadRequest) FromBundleWorkloadRequest(v BundleWorkloadRequest) error {
v.Type = "bundle"
b, err := json.Marshal(v)
t.union = b
return err
}
// MergeBundleWorkloadRequest performs a merge with any union data inside the WorkloadRequest, using the provided BundleWorkloadRequest
func (t *WorkloadRequest) MergeBundleWorkloadRequest(v BundleWorkloadRequest) error {
v.Type = "bundle"
b, err := json.Marshal(v)
if err != nil {
return err
}
merged, err := runtime.JSONMerge(t.union, b)
t.union = merged
return err
}
func (t WorkloadRequest) Discriminator() (string, error) {
var discriminator struct {
Discriminator string `json:"type"`
}
err := json.Unmarshal(t.union, &discriminator)
return discriminator.Discriminator, err
}
func (t WorkloadRequest) ValueByDiscriminator() (interface{}, error) {
discriminator, err := t.Discriminator()
if err != nil {
return nil, err
}
switch discriminator {
case "bundle":
return t.AsBundleWorkloadRequest()
default:
return nil, errors.New("unknown discriminator value: " + discriminator)
}
}
func (t WorkloadRequest) MarshalJSON() ([]byte, error) {
b, err := t.union.MarshalJSON()
return b, err
}
func (t *WorkloadRequest) UnmarshalJSON(b []byte) error {
err := t.union.UnmarshalJSON(b)
return err
}
// AsBundleWorkloadResponse returns the union data inside the WorkloadResponse as a BundleWorkloadResponse
func (t WorkloadResponse) AsBundleWorkloadResponse() (BundleWorkloadResponse, error) {
var body BundleWorkloadResponse
err := json.Unmarshal(t.union, &body)
return body, err
}
// FromBundleWorkloadResponse overwrites any union data inside the WorkloadResponse as the provided BundleWorkloadResponse
func (t *WorkloadResponse) FromBundleWorkloadResponse(v BundleWorkloadResponse) error {
v.Type = "bundle"
b, err := json.Marshal(v)
t.union = b
return err
}
// MergeBundleWorkloadResponse performs a merge with any union data inside the WorkloadResponse, using the provided BundleWorkloadResponse
func (t *WorkloadResponse) MergeBundleWorkloadResponse(v BundleWorkloadResponse) error {
v.Type = "bundle"
b, err := json.Marshal(v)
if err != nil {
return err
}
merged, err := runtime.JSONMerge(t.union, b)
t.union = merged
return err
}
func (t WorkloadResponse) Discriminator() (string, error) {
var discriminator struct {
Discriminator string `json:"type"`
}
err := json.Unmarshal(t.union, &discriminator)
return discriminator.Discriminator, err
}
func (t WorkloadResponse) ValueByDiscriminator() (interface{}, error) {
discriminator, err := t.Discriminator()
if err != nil {
return nil, err
}
switch discriminator {
case "bundle":
return t.AsBundleWorkloadResponse()
default:
return nil, errors.New("unknown discriminator value: " + discriminator)
}
}
func (t WorkloadResponse) MarshalJSON() ([]byte, error) {
b, err := t.union.MarshalJSON()
return b, err
}
func (t *WorkloadResponse) UnmarshalJSON(b []byte) error {
err := t.union.UnmarshalJSON(b)
return err
}

File diff suppressed because it is too large Load Diff

View File

@@ -48,9 +48,6 @@ service ManagementService {
// Logout logs out the peer and removes it from the management server
rpc Logout(EncryptedMessage) returns (Empty) {}
// Executes a job on a target peer (e.g., debug bundle)
rpc Job(stream EncryptedMessage) returns (stream EncryptedMessage) {}
}
message EncryptedMessage {
@@ -63,42 +60,6 @@ message EncryptedMessage {
int32 version = 3;
}
message JobRequest {
bytes ID = 1;
oneof workload_parameters {
BundleParameters bundle = 10;
//OtherParameters other = 11;
}
}
enum JobStatus {
unknown_status = 0; //placeholder
succeeded = 1;
failed = 2;
}
message JobResponse{
bytes ID = 1;
JobStatus status=2;
bytes Reason=3;
oneof workload_results {
BundleResult bundle = 10;
//OtherResult other = 11;
}
}
message BundleParameters {
bool bundle_for = 1;
int64 bundle_for_time = 2;
int32 log_file_count = 3;
bool anonymize = 4;
}
message BundleResult {
string upload_key = 1;
}
message SyncRequest {
// Meta data of the peer
PeerSystemMeta meta = 1;
@@ -439,6 +400,7 @@ message Route {
string NetID = 7;
repeated string Domains = 8;
bool keepRoute = 9;
bool skipAutoApply = 10;
}
// DNSConfig represents a dns.Update

View File

@@ -50,8 +50,6 @@ type ManagementServiceClient interface {
SyncMeta(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*Empty, error)
// Logout logs out the peer and removes it from the management server
Logout(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*Empty, error)
// Executes a job on a target peer (e.g., debug bundle)
Job(ctx context.Context, opts ...grpc.CallOption) (ManagementService_JobClient, error)
}
type managementServiceClient struct {
@@ -157,37 +155,6 @@ func (c *managementServiceClient) Logout(ctx context.Context, in *EncryptedMessa
return out, nil
}
func (c *managementServiceClient) Job(ctx context.Context, opts ...grpc.CallOption) (ManagementService_JobClient, error) {
stream, err := c.cc.NewStream(ctx, &ManagementService_ServiceDesc.Streams[1], "/management.ManagementService/Job", opts...)
if err != nil {
return nil, err
}
x := &managementServiceJobClient{stream}
return x, nil
}
type ManagementService_JobClient interface {
Send(*EncryptedMessage) error
Recv() (*EncryptedMessage, error)
grpc.ClientStream
}
type managementServiceJobClient struct {
grpc.ClientStream
}
func (x *managementServiceJobClient) Send(m *EncryptedMessage) error {
return x.ClientStream.SendMsg(m)
}
func (x *managementServiceJobClient) Recv() (*EncryptedMessage, error) {
m := new(EncryptedMessage)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// ManagementServiceServer is the server API for ManagementService service.
// All implementations must embed UnimplementedManagementServiceServer
// for forward compatibility
@@ -224,8 +191,6 @@ type ManagementServiceServer interface {
SyncMeta(context.Context, *EncryptedMessage) (*Empty, error)
// Logout logs out the peer and removes it from the management server
Logout(context.Context, *EncryptedMessage) (*Empty, error)
// Executes a job on a target peer (e.g., debug bundle)
Job(ManagementService_JobServer) error
mustEmbedUnimplementedManagementServiceServer()
}
@@ -257,9 +222,6 @@ func (UnimplementedManagementServiceServer) SyncMeta(context.Context, *Encrypted
func (UnimplementedManagementServiceServer) Logout(context.Context, *EncryptedMessage) (*Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method Logout not implemented")
}
func (UnimplementedManagementServiceServer) Job(ManagementService_JobServer) error {
return status.Errorf(codes.Unimplemented, "method Job not implemented")
}
func (UnimplementedManagementServiceServer) mustEmbedUnimplementedManagementServiceServer() {}
// UnsafeManagementServiceServer may be embedded to opt out of forward compatibility for this service.
@@ -420,32 +382,6 @@ func _ManagementService_Logout_Handler(srv interface{}, ctx context.Context, dec
return interceptor(ctx, in, info, handler)
}
func _ManagementService_Job_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(ManagementServiceServer).Job(&managementServiceJobServer{stream})
}
type ManagementService_JobServer interface {
Send(*EncryptedMessage) error
Recv() (*EncryptedMessage, error)
grpc.ServerStream
}
type managementServiceJobServer struct {
grpc.ServerStream
}
func (x *managementServiceJobServer) Send(m *EncryptedMessage) error {
return x.ServerStream.SendMsg(m)
}
func (x *managementServiceJobServer) Recv() (*EncryptedMessage, error) {
m := new(EncryptedMessage)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// ManagementService_ServiceDesc is the grpc.ServiceDesc for ManagementService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -488,12 +424,6 @@ var ManagementService_ServiceDesc = grpc.ServiceDesc{
Handler: _ManagementService_Sync_Handler,
ServerStreams: true,
},
{
StreamName: "Job",
Handler: _ManagementService_Job_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "management.proto",
}