mirror of
https://github.com/netbirdio/netbird.git
synced 2026-06-25 01:09:54 +00:00
Compare commits
1 Commits
client-jso
...
handle_syn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7de571e2f4 |
@@ -5,7 +5,6 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -23,21 +22,15 @@ var serviceCmd = &cobra.Command{
|
||||
Short: "Manage the NetBird daemon service",
|
||||
}
|
||||
|
||||
const defaultJSONSocket = "unix:///var/run/netbird-http.sock"
|
||||
|
||||
var (
|
||||
serviceName string
|
||||
serviceEnvVars []string
|
||||
jsonSocket string
|
||||
jsonSocketDisabled bool
|
||||
serviceName string
|
||||
serviceEnvVars []string
|
||||
)
|
||||
|
||||
type program struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
serv *grpc.Server
|
||||
jsonServ *http.Server
|
||||
jsonServMu sync.Mutex
|
||||
serverInstance *server.Server
|
||||
serverInstanceMu sync.Mutex
|
||||
}
|
||||
@@ -53,8 +46,6 @@ func init() {
|
||||
serviceCmd.PersistentFlags().BoolVar(&updateSettingsDisabled, "disable-update-settings", false, "Disables update settings feature. If enabled, the client will not be able to change or edit any settings. To persist this setting, use: netbird service install --disable-update-settings")
|
||||
serviceCmd.PersistentFlags().BoolVar(&captureEnabled, "enable-capture", false, "Enables packet capture via 'netbird debug capture'. To persist, use: netbird service install --enable-capture")
|
||||
serviceCmd.PersistentFlags().BoolVar(&networksDisabled, "disable-networks", false, "Disables network selection. If enabled, the client will not allow listing, selecting, or deselecting networks. To persist, use: netbird service install --disable-networks")
|
||||
serviceCmd.PersistentFlags().StringVar(&jsonSocket, "json-socket", defaultJSONSocket, "HTTP/JSON API socket address served by grpc-gateway [unix|tcp]://[path|host:port]. To persist, use: netbird service install --json-socket")
|
||||
serviceCmd.PersistentFlags().BoolVar(&jsonSocketDisabled, "disable-json-socket", false, "Disables the HTTP/JSON API socket. To persist, use: netbird service install --disable-json-socket")
|
||||
|
||||
rootCmd.PersistentFlags().StringVarP(&serviceName, "service", "s", defaultServiceName, "Netbird system service name")
|
||||
serviceEnvDesc := `Sets extra environment variables for the service. ` +
|
||||
|
||||
@@ -5,6 +5,9 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/kardianos/service"
|
||||
@@ -29,35 +32,31 @@ func (p *program) Start(svc service.Service) error {
|
||||
// in any case, even if configuration does not exists we run daemon to serve CLI gRPC API.
|
||||
p.serv = grpc.NewServer()
|
||||
|
||||
daemonListener, err := listenOnAddress(daemonAddr)
|
||||
split := strings.Split(daemonAddr, "://")
|
||||
switch split[0] {
|
||||
case "unix":
|
||||
// cleanup failed close
|
||||
stat, err := os.Stat(split[1])
|
||||
if err == nil && !stat.IsDir() {
|
||||
if err := os.Remove(split[1]); err != nil {
|
||||
log.Debugf("remove socket file: %v", err)
|
||||
}
|
||||
}
|
||||
case "tcp":
|
||||
default:
|
||||
return fmt.Errorf("unsupported daemon address protocol: %v", split[0])
|
||||
}
|
||||
|
||||
listen, err := net.Listen(split[0], split[1])
|
||||
if err != nil {
|
||||
return fmt.Errorf("listen daemon interface: %w", err)
|
||||
}
|
||||
|
||||
var jsonListener *socketListener
|
||||
if !jsonSocketDisabled {
|
||||
jsonListener, err = listenOnAddress(jsonSocket)
|
||||
if err != nil {
|
||||
_ = daemonListener.Close()
|
||||
return fmt.Errorf("listen daemon JSON interface: %w", err)
|
||||
}
|
||||
} else {
|
||||
removeStaleUnixSocketForAddress(jsonSocket)
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer daemonListener.Close()
|
||||
if jsonListener != nil {
|
||||
defer jsonListener.Close()
|
||||
}
|
||||
defer listen.Close()
|
||||
|
||||
if err := daemonListener.chmodUnixSocket("daemon"); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
if jsonListener != nil {
|
||||
if err := jsonListener.chmodUnixSocket("daemon JSON"); err != nil {
|
||||
log.Error(err)
|
||||
if split[0] == "unix" {
|
||||
if err := os.Chmod(split[1], 0666); err != nil {
|
||||
log.Errorf("failed setting daemon permissions: %v", split[1])
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -72,16 +71,8 @@ func (p *program) Start(svc service.Service) error {
|
||||
p.serverInstance = serverInstance
|
||||
p.serverInstanceMu.Unlock()
|
||||
|
||||
if jsonListener != nil {
|
||||
if err := p.startJSONGateway(jsonListener, daemonAddr); err != nil {
|
||||
log.Fatalf("failed to start daemon JSON server: %v", err)
|
||||
}
|
||||
} else {
|
||||
log.Debug("daemon JSON socket disabled")
|
||||
}
|
||||
|
||||
log.Printf("started daemon server: %v", daemonListener.address)
|
||||
if err := p.serv.Serve(daemonListener.Listener); err != nil {
|
||||
log.Printf("started daemon server: %v", split[1])
|
||||
if err := p.serv.Serve(listen); err != nil {
|
||||
log.Errorf("failed to serve daemon requests: %v", err)
|
||||
}
|
||||
}()
|
||||
@@ -101,20 +92,6 @@ func (p *program) Stop(srv service.Service) error {
|
||||
|
||||
p.cancel()
|
||||
|
||||
p.jsonServMu.Lock()
|
||||
jsonServ := p.jsonServ
|
||||
p.jsonServMu.Unlock()
|
||||
if jsonServ != nil {
|
||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
if err := jsonServ.Shutdown(shutdownCtx); err != nil {
|
||||
log.Errorf("failed to stop daemon JSON server gracefully: %v", err)
|
||||
if err := jsonServ.Close(); err != nil {
|
||||
log.Errorf("failed to close daemon JSON server: %v", err)
|
||||
}
|
||||
}
|
||||
shutdownCancel()
|
||||
}
|
||||
|
||||
if p.serv != nil {
|
||||
p.serv.Stop()
|
||||
}
|
||||
|
||||
@@ -67,11 +67,6 @@ func buildServiceArguments() []string {
|
||||
args = append(args, "--disable-networks")
|
||||
}
|
||||
|
||||
args = append(args, "--json-socket", jsonSocket)
|
||||
if jsonSocketDisabled {
|
||||
args = append(args, "--disable-json-socket")
|
||||
}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
|
||||
@@ -1,52 +0,0 @@
|
||||
//go:build !ios && !android
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
||||
"github.com/netbirdio/netbird/client/proto"
|
||||
)
|
||||
|
||||
func grpcGatewayEndpoint(addr string) string {
|
||||
return strings.TrimPrefix(addr, "tcp://")
|
||||
}
|
||||
|
||||
func (p *program) startJSONGateway(jsonListener *socketListener, daemonEndpoint string) error {
|
||||
mux := runtime.NewServeMux()
|
||||
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
|
||||
if err := proto.RegisterDaemonServiceHandlerFromEndpoint(p.ctx, mux, grpcGatewayEndpoint(daemonEndpoint), opts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
jsonServer := &http.Server{
|
||||
Handler: mux,
|
||||
ReadHeaderTimeout: 5 * time.Second,
|
||||
BaseContext: func(net.Listener) context.Context {
|
||||
return p.ctx
|
||||
},
|
||||
}
|
||||
|
||||
p.jsonServMu.Lock()
|
||||
p.jsonServ = jsonServer
|
||||
p.jsonServMu.Unlock()
|
||||
|
||||
go func() {
|
||||
log.Printf("started daemon JSON server: %v", jsonListener.address)
|
||||
if err := jsonServer.Serve(jsonListener.Listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
log.Errorf("failed to serve daemon JSON requests: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -23,7 +23,6 @@ const serviceParamsFile = "service.json"
|
||||
type serviceParams struct {
|
||||
LogLevel string `json:"log_level"`
|
||||
DaemonAddr string `json:"daemon_addr"`
|
||||
JSONSocket string `json:"json_socket"`
|
||||
ManagementURL string `json:"management_url,omitempty"`
|
||||
ConfigPath string `json:"config_path,omitempty"`
|
||||
LogFiles []string `json:"log_files,omitempty"`
|
||||
@@ -31,7 +30,6 @@ type serviceParams struct {
|
||||
DisableUpdateSettings bool `json:"disable_update_settings,omitempty"`
|
||||
EnableCapture bool `json:"enable_capture,omitempty"`
|
||||
DisableNetworks bool `json:"disable_networks,omitempty"`
|
||||
DisableJSONSocket bool `json:"disable_json_socket,omitempty"`
|
||||
ServiceEnvVars map[string]string `json:"service_env_vars,omitempty"`
|
||||
}
|
||||
|
||||
@@ -77,7 +75,6 @@ func currentServiceParams() *serviceParams {
|
||||
params := &serviceParams{
|
||||
LogLevel: logLevel,
|
||||
DaemonAddr: daemonAddr,
|
||||
JSONSocket: jsonSocket,
|
||||
ManagementURL: managementURL,
|
||||
ConfigPath: configPath,
|
||||
LogFiles: logFiles,
|
||||
@@ -85,7 +82,6 @@ func currentServiceParams() *serviceParams {
|
||||
DisableUpdateSettings: updateSettingsDisabled,
|
||||
EnableCapture: captureEnabled,
|
||||
DisableNetworks: networksDisabled,
|
||||
DisableJSONSocket: jsonSocketDisabled,
|
||||
}
|
||||
|
||||
if len(serviceEnvVars) > 0 {
|
||||
@@ -117,8 +113,9 @@ func applyServiceParams(cmd *cobra.Command, params *serviceParams) {
|
||||
return
|
||||
}
|
||||
|
||||
// For fields with non-empty defaults, keep the != "" guard so that an older
|
||||
// service.json missing the field doesn't clobber the default with an empty string.
|
||||
// For fields with non-empty defaults (log-level, daemon-addr), keep the
|
||||
// != "" guard so that an older service.json missing the field doesn't
|
||||
// clobber the default with an empty string.
|
||||
if !rootCmd.PersistentFlags().Changed("log-level") && params.LogLevel != "" {
|
||||
logLevel = params.LogLevel
|
||||
}
|
||||
@@ -127,20 +124,6 @@ func applyServiceParams(cmd *cobra.Command, params *serviceParams) {
|
||||
daemonAddr = params.DaemonAddr
|
||||
}
|
||||
|
||||
jsonSocketChanged := serviceCmd.PersistentFlags().Changed("json-socket")
|
||||
if !jsonSocketChanged && params.JSONSocket != "" {
|
||||
jsonSocket = params.JSONSocket
|
||||
}
|
||||
|
||||
if !serviceCmd.PersistentFlags().Changed("disable-json-socket") {
|
||||
jsonSocketDisabled = params.DisableJSONSocket
|
||||
// Passing --json-socket should re-enable the JSON gateway unless
|
||||
// --disable-json-socket was explicitly provided too.
|
||||
if jsonSocketChanged {
|
||||
jsonSocketDisabled = false
|
||||
}
|
||||
}
|
||||
|
||||
// For optional fields where empty means "use default", always apply so
|
||||
// that an explicit clear (--management-url "") persists across reinstalls.
|
||||
if !rootCmd.PersistentFlags().Changed("management-url") {
|
||||
|
||||
@@ -530,7 +530,6 @@ func fieldToGlobalVar(field string) string {
|
||||
m := map[string]string{
|
||||
"LogLevel": "logLevel",
|
||||
"DaemonAddr": "daemonAddr",
|
||||
"JSONSocket": "jsonSocket",
|
||||
"ManagementURL": "managementURL",
|
||||
"ConfigPath": "configPath",
|
||||
"LogFiles": "logFiles",
|
||||
@@ -538,7 +537,6 @@ func fieldToGlobalVar(field string) string {
|
||||
"DisableUpdateSettings": "updateSettingsDisabled",
|
||||
"EnableCapture": "captureEnabled",
|
||||
"DisableNetworks": "networksDisabled",
|
||||
"DisableJSONSocket": "jsonSocketDisabled",
|
||||
"ServiceEnvVars": "serviceEnvVars",
|
||||
}
|
||||
if v, ok := m[field]; ok {
|
||||
|
||||
@@ -1,83 +0,0 @@
|
||||
//go:build !ios && !android
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type socketListener struct {
|
||||
net.Listener
|
||||
network string
|
||||
address string
|
||||
}
|
||||
|
||||
func listenOnAddress(addr string) (*socketListener, error) {
|
||||
network, address, err := parseListenAddress(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if network == "unix" {
|
||||
removeStaleUnixSocket(address)
|
||||
}
|
||||
|
||||
listener, err := net.Listen(network, address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &socketListener{Listener: listener, network: network, address: address}, nil
|
||||
}
|
||||
|
||||
func parseListenAddress(addr string) (string, string, error) {
|
||||
network, address, ok := strings.Cut(addr, "://")
|
||||
if !ok || network == "" || address == "" {
|
||||
return "", "", fmt.Errorf("address must be in [unix|tcp]://[path|host:port] format: %q", addr)
|
||||
}
|
||||
|
||||
switch network {
|
||||
case "unix", "tcp":
|
||||
return network, address, nil
|
||||
default:
|
||||
return "", "", fmt.Errorf("unsupported daemon address protocol: %v", network)
|
||||
}
|
||||
}
|
||||
|
||||
func removeStaleUnixSocket(path string) {
|
||||
stat, err := os.Stat(path)
|
||||
if err == nil && !stat.IsDir() {
|
||||
if err := os.Remove(path); err != nil {
|
||||
log.Debugf("remove socket file: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
log.Debugf("stat socket file: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func removeStaleUnixSocketForAddress(addr string) {
|
||||
network, address, err := parseListenAddress(addr)
|
||||
if err != nil || network != "unix" {
|
||||
return
|
||||
}
|
||||
removeStaleUnixSocket(address)
|
||||
}
|
||||
|
||||
func (l *socketListener) chmodUnixSocket(description string) error {
|
||||
if l == nil || l.network != "unix" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := os.Chmod(l.address, 0666); err != nil {
|
||||
return fmt.Errorf("failed setting %s permissions for %s: %w", description, l.address, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -895,6 +895,16 @@ func (e *Engine) handleAutoUpdateVersion(autoUpdateSettings *mgmProto.AutoUpdate
|
||||
e.updateManager.SetVersion(autoUpdateSettings.Version, autoUpdateSettings.AlwaysUpdate)
|
||||
}
|
||||
|
||||
// phase times a sync sub-phase: it returns a function that records the elapsed
|
||||
// duration when called. Starting the timer at the call site keeps inter-phase
|
||||
// glue code out of the measurement.
|
||||
func (e *Engine) phase(name string) func() {
|
||||
start := time.Now()
|
||||
return func() {
|
||||
e.clientMetrics.RecordSyncPhase(e.ctx, name, time.Since(start))
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
@@ -914,9 +924,11 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
e.handleAutoUpdateVersion(update.NetworkMap.PeerConfig.AutoUpdate)
|
||||
}
|
||||
|
||||
done := e.phase("netbird_config")
|
||||
if err := e.updateNetbirdConfig(update.GetNetbirdConfig()); err != nil {
|
||||
return err
|
||||
}
|
||||
done()
|
||||
|
||||
// Posture checks are bound to the network map presence:
|
||||
// NetworkMap != nil, checks present -> apply the received checks
|
||||
@@ -928,11 +940,15 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
done = e.phase("checks")
|
||||
if err := e.updateChecksIfNew(update.Checks); err != nil {
|
||||
return err
|
||||
}
|
||||
done()
|
||||
|
||||
done = e.phase("persist")
|
||||
e.persistSyncResponse(update)
|
||||
done()
|
||||
|
||||
// only apply new changes and ignore old ones
|
||||
if err := e.updateNetworkMap(nm); err != nil {
|
||||
@@ -1357,13 +1373,16 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
|
||||
dnsConfig := toDNSConfig(protoDNSConfig, e.wgInterface.Address())
|
||||
|
||||
done := e.phase("dns_server")
|
||||
if err := e.dnsServer.UpdateDNSServer(serial, dnsConfig); err != nil {
|
||||
log.Errorf("failed to update dns server, err: %v", err)
|
||||
}
|
||||
done()
|
||||
|
||||
e.routeManager.SetDNSForwarderPort(dnsConfig.ForwarderPort)
|
||||
|
||||
// apply routes first, route related actions might depend on routing being enabled
|
||||
done = e.phase("routes_classify")
|
||||
routes := toRoutes(networkMap.GetRoutes())
|
||||
serverRoutes, clientRoutes := e.routeManager.ClassifyRoutes(routes)
|
||||
|
||||
@@ -1372,28 +1391,39 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
e.connMgr.UpdateRouteHAMap(clientRoutes)
|
||||
log.Debugf("updated lazy connection manager with %d HA groups", len(clientRoutes))
|
||||
}
|
||||
done()
|
||||
|
||||
done = e.phase("routes_apply")
|
||||
dnsRouteFeatureFlag := toDNSFeatureFlag(networkMap)
|
||||
if err := e.routeManager.UpdateRoutes(serial, serverRoutes, clientRoutes, dnsRouteFeatureFlag); err != nil {
|
||||
log.Errorf("failed to update routes: %v", err)
|
||||
}
|
||||
done()
|
||||
|
||||
done = e.phase("filtering")
|
||||
if e.acl != nil {
|
||||
e.acl.ApplyFiltering(networkMap, dnsRouteFeatureFlag)
|
||||
}
|
||||
done()
|
||||
|
||||
done = e.phase("dns_forwarder")
|
||||
fwdEntries := toRouteDomains(e.config.WgPrivateKey.PublicKey().String(), routes)
|
||||
e.updateDNSForwarder(dnsRouteFeatureFlag, fwdEntries)
|
||||
done()
|
||||
|
||||
// Ingress forward rules
|
||||
done = e.phase("forward_rules")
|
||||
forwardingRules, err := e.updateForwardRules(networkMap.GetForwardingRules())
|
||||
if err != nil {
|
||||
log.Errorf("failed to update forward rules, err: %v", err)
|
||||
}
|
||||
done()
|
||||
|
||||
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers()))
|
||||
|
||||
done = e.phase("offline_peers")
|
||||
e.updateOfflinePeers(networkMap.GetOfflinePeers())
|
||||
done()
|
||||
|
||||
// Filter out own peer from the remote peers list
|
||||
localPubKey := e.config.WgPrivateKey.PublicKey().String()
|
||||
@@ -1412,20 +1442,26 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
done = e.phase("removed_peers")
|
||||
err := e.removePeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
done()
|
||||
|
||||
done = e.phase("modified_peers")
|
||||
err = e.modifyPeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
done()
|
||||
|
||||
done = e.phase("added_peers")
|
||||
err = e.addNewPeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
done()
|
||||
|
||||
e.statusRecorder.FinishPeerListModifications()
|
||||
|
||||
@@ -1439,8 +1475,10 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
}
|
||||
|
||||
// must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store
|
||||
done = e.phase("lazy_exclude")
|
||||
excludedLazyPeers := e.toExcludedLazyPeers(forwardingRules, remotePeers)
|
||||
e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers)
|
||||
done()
|
||||
|
||||
e.networkSerial = serial
|
||||
|
||||
|
||||
@@ -120,6 +120,30 @@ func (m *influxDBMetrics) RecordSyncDuration(_ context.Context, agentInfo AgentI
|
||||
m.trimLocked()
|
||||
}
|
||||
|
||||
func (m *influxDBMetrics) RecordSyncPhase(_ context.Context, agentInfo AgentInfo, phase string, duration time.Duration) {
|
||||
tags := fmt.Sprintf("deployment_type=%s,version=%s,os=%s,arch=%s,peer_id=%s,phase=%s",
|
||||
agentInfo.DeploymentType.String(),
|
||||
agentInfo.Version,
|
||||
agentInfo.OS,
|
||||
agentInfo.Arch,
|
||||
agentInfo.peerID,
|
||||
phase,
|
||||
)
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.samples = append(m.samples, influxSample{
|
||||
measurement: "netbird_sync_phase",
|
||||
tags: tags,
|
||||
fields: map[string]float64{
|
||||
"duration_seconds": duration.Seconds(),
|
||||
},
|
||||
timestamp: time.Now(),
|
||||
})
|
||||
m.trimLocked()
|
||||
}
|
||||
|
||||
func (m *influxDBMetrics) RecordLoginDuration(_ context.Context, agentInfo AgentInfo, duration time.Duration, success bool) {
|
||||
result := "success"
|
||||
if !success {
|
||||
|
||||
@@ -0,0 +1,259 @@
|
||||
{
|
||||
"annotations": {
|
||||
"list": []
|
||||
},
|
||||
"editable": true,
|
||||
"fiscalYearStartMonth": 0,
|
||||
"graphTooltip": 1,
|
||||
"links": [],
|
||||
"refresh": "",
|
||||
"schemaVersion": 39,
|
||||
"tags": [
|
||||
"netbird",
|
||||
"sync"
|
||||
],
|
||||
"templating": {
|
||||
"list": [
|
||||
{
|
||||
"current": {
|
||||
"text": "All",
|
||||
"value": "$__all"
|
||||
},
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"definition": "import \"influxdata/influxdb/schema\"\nschema.tagValues(bucket: \"metrics\", tag: \"version\")",
|
||||
"includeAll": true,
|
||||
"label": "version",
|
||||
"multi": true,
|
||||
"name": "version",
|
||||
"query": "import \"influxdata/influxdb/schema\"\nschema.tagValues(bucket: \"metrics\", tag: \"version\")",
|
||||
"refresh": 2,
|
||||
"type": "query",
|
||||
"allValue": ".*"
|
||||
}
|
||||
]
|
||||
},
|
||||
"time": {
|
||||
"from": "now-2d",
|
||||
"to": "now"
|
||||
},
|
||||
"timepicker": {},
|
||||
"timezone": "",
|
||||
"title": "NetBird Sync Phases (where time goes)",
|
||||
"uid": "netbird-sync-phases",
|
||||
"version": 1,
|
||||
"panels": [
|
||||
{
|
||||
"id": 1,
|
||||
"title": "Time per phase over time (stacked, ms)",
|
||||
"type": "timeseries",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 10,
|
||||
"w": 24,
|
||||
"x": 0,
|
||||
"y": 0
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "ms",
|
||||
"custom": {
|
||||
"drawStyle": "bars",
|
||||
"stacking": {
|
||||
"mode": "normal",
|
||||
"group": "A"
|
||||
},
|
||||
"fillOpacity": 80,
|
||||
"lineWidth": 0
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": {
|
||||
"displayMode": "table",
|
||||
"placement": "right",
|
||||
"calcs": [
|
||||
"max",
|
||||
"mean"
|
||||
]
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "multi",
|
||||
"sort": "desc"
|
||||
}
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"refId": "A",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync_phase\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> keep(columns: [\"_time\", \"_value\", \"phase\"])\n |> group(columns: [\"phase\"])"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"title": "p95 per phase (ms)",
|
||||
"type": "bargauge",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 11,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 10
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "ms",
|
||||
"color": {
|
||||
"mode": "continuous-GrYlRd"
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"displayMode": "gradient",
|
||||
"orientation": "horizontal",
|
||||
"reduceOptions": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"fields": "",
|
||||
"values": false
|
||||
},
|
||||
"showUnfilled": true
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"refId": "A",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync_phase\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> group(columns: [\"phase\"])\n |> quantile(q: 0.95)\n |> group()\n |> sort(columns: [\"_value\"], desc: true)"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 3,
|
||||
"title": "Per-phase stats (ms): mean / p95 / max",
|
||||
"type": "table",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 11,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 10
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "ms"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"showHeader": true,
|
||||
"sortBy": [
|
||||
{
|
||||
"displayName": "max",
|
||||
"desc": true
|
||||
}
|
||||
]
|
||||
},
|
||||
"transformations": [
|
||||
{
|
||||
"id": "merge",
|
||||
"options": {}
|
||||
}
|
||||
],
|
||||
"targets": [
|
||||
{
|
||||
"refId": "mean",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync_phase\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> group(columns: [\"phase\"])\n |> mean()\n |> group()\n |> keep(columns: [\"phase\", \"_value\"])\n |> rename(columns: {_value: \"mean\"})"
|
||||
},
|
||||
{
|
||||
"refId": "p95",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync_phase\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> group(columns: [\"phase\"])\n |> quantile(q: 0.95)\n |> group()\n |> keep(columns: [\"phase\", \"_value\"])\n |> rename(columns: {_value: \"p95\"})"
|
||||
},
|
||||
{
|
||||
"refId": "max",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync_phase\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> group(columns: [\"phase\"])\n |> max()\n |> group()\n |> keep(columns: [\"phase\", \"_value\"])\n |> rename(columns: {_value: \"max\"})"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 4,
|
||||
"title": "Total sync duration (netbird_sync, ms) \u2014 reference",
|
||||
"type": "timeseries",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 24,
|
||||
"x": 0,
|
||||
"y": 21
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "ms",
|
||||
"custom": {
|
||||
"drawStyle": "points",
|
||||
"pointSize": 5
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": {
|
||||
"displayMode": "table",
|
||||
"placement": "right",
|
||||
"calcs": [
|
||||
"max",
|
||||
"mean"
|
||||
]
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "single"
|
||||
}
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"refId": "A",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> keep(columns: [\"_time\", \"_value\", \"version\"])\n |> group(columns: [\"version\"])"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -59,6 +59,19 @@ var allowedMeasurements = map[string]measurementSpec{
|
||||
"peer_id": true,
|
||||
},
|
||||
},
|
||||
"netbird_sync_phase": {
|
||||
allowedFields: map[string]bool{
|
||||
"duration_seconds": true,
|
||||
},
|
||||
allowedTags: map[string]bool{
|
||||
"deployment_type": true,
|
||||
"version": true,
|
||||
"os": true,
|
||||
"arch": true,
|
||||
"peer_id": true,
|
||||
"phase": true,
|
||||
},
|
||||
},
|
||||
"netbird_login": {
|
||||
allowedFields: map[string]bool{
|
||||
"duration_seconds": true,
|
||||
|
||||
@@ -56,6 +56,9 @@ type metricsImplementation interface {
|
||||
// RecordSyncDuration records how long it took to process a sync message
|
||||
RecordSyncDuration(ctx context.Context, agentInfo AgentInfo, duration time.Duration)
|
||||
|
||||
// RecordSyncPhase records how long a single sub-phase of sync processing took
|
||||
RecordSyncPhase(ctx context.Context, agentInfo AgentInfo, phase string, duration time.Duration)
|
||||
|
||||
// RecordLoginDuration records how long the login to management took
|
||||
RecordLoginDuration(ctx context.Context, agentInfo AgentInfo, duration time.Duration, success bool)
|
||||
|
||||
@@ -127,6 +130,18 @@ func (c *ClientMetrics) RecordSyncDuration(ctx context.Context, duration time.Du
|
||||
c.impl.RecordSyncDuration(ctx, agentInfo, duration)
|
||||
}
|
||||
|
||||
// RecordSyncPhase records the duration of a single sub-phase of sync processing
|
||||
func (c *ClientMetrics) RecordSyncPhase(ctx context.Context, phase string, duration time.Duration) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
c.mu.RLock()
|
||||
agentInfo := c.agentInfo
|
||||
c.mu.RUnlock()
|
||||
|
||||
c.impl.RecordSyncPhase(ctx, agentInfo, phase, duration)
|
||||
}
|
||||
|
||||
// RecordLoginDuration records how long the login to management server took
|
||||
func (c *ClientMetrics) RecordLoginDuration(ctx context.Context, duration time.Duration, success bool) {
|
||||
if c == nil {
|
||||
|
||||
@@ -70,6 +70,9 @@ func (m *mockMetrics) RecordConnectionStages(_ context.Context, _ AgentInfo, _ s
|
||||
func (m *mockMetrics) RecordSyncDuration(_ context.Context, _ AgentInfo, _ time.Duration) {
|
||||
}
|
||||
|
||||
func (m *mockMetrics) RecordSyncPhase(_ context.Context, _ AgentInfo, _ string, _ time.Duration) {
|
||||
}
|
||||
|
||||
func (m *mockMetrics) RecordLoginDuration(_ context.Context, _ AgentInfo, _ time.Duration, _ bool) {
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -12,11 +12,5 @@ script_path=$(dirname "$(realpath "$0")")
|
||||
cd "$script_path"
|
||||
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.36.6
|
||||
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.6.1
|
||||
go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@v2.26.3
|
||||
protoc -I ./ ./daemon.proto \
|
||||
--go_out=../ \
|
||||
--go-grpc_out=../ \
|
||||
--grpc-gateway_out=../ \
|
||||
--grpc-gateway_opt=generate_unbound_methods=true \
|
||||
--experimental_allow_proto3_optional
|
||||
protoc -I ./ ./daemon.proto --go_out=../ --go-grpc_out=../ --experimental_allow_proto3_optional
|
||||
cd "$old_pwd"
|
||||
|
||||
2
go.mod
2
go.mod
@@ -66,7 +66,6 @@ require (
|
||||
github.com/google/nftables v0.3.0
|
||||
github.com/gopacket/gopacket v1.4.0
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.2-0.20240212192251-757544f21357
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3
|
||||
github.com/hashicorp/go-multierror v1.1.1
|
||||
github.com/hashicorp/go-secure-stdlib/base62 v0.1.2
|
||||
github.com/hashicorp/go-version v1.7.0
|
||||
@@ -331,7 +330,6 @@ require (
|
||||
golang.org/x/text v0.36.0 // indirect
|
||||
golang.org/x/tools v0.43.0 // indirect
|
||||
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260319201613-d00831a3d3e7 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect
|
||||
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
||||
|
||||
Reference in New Issue
Block a user