mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-31 21:19:55 +00:00
* add SSO session extend flow (management)
Adds the management-server half of the SSO session-extension feature:
- New ExtendAuthSession gRPC RPC that refreshes a peer's session expiry
using a fresh JWT, validated through the same pipeline as Login but
without tearing down the tunnel or redoing the NetworkMap sync.
- Per-peer SessionExpiresAt timestamp on every LoginResponse and
SyncResponse so connected clients learn the deadline on the existing
long-lived stream, and admin-side changes (toggling expiration,
changing the expiration window) reach every peer within seconds.
- SessionExpiresAt(...) helper on Peer that derives the absolute UTC
deadline from LastLogin + the account-level PeerLoginExpiration
setting, returning zero when the peer is not SSO-tracked or expiration
is disabled.
The matching client-side consumer of these fields lands separately.
* encode SessionExpiresAt as 3-state on the wire
Previously the `sessionExpiresAt` field on LoginResponse, SyncResponse
and ExtendAuthSessionResponse was 2-state: a valid timestamp meant
"new deadline", and nil meant "clear". That conflated two distinct
meanings — "no info in this snapshot" vs "expiry is explicitly off /
peer is not SSO-tracked" — so a Sync push that legitimately couldn't
compute the deadline (settings lookup failed) would silently clear the
client's anchor and lose the warning window.
Three states now, encoded on the same field number (no .proto schema
churn — only comments and the server-side encoder change):
- nil pointer (field absent) → "no info"; client preserves anchor
- &Timestamp{} (seconds=0, nanos=0) → explicit "disabled / not SSO"
sentinel; client clears
- valid timestamp → new absolute UTC deadline
A new encodeSessionExpiresAt helper centralises the zero/non-zero
encoding and is shared by the Sync, Login and ExtendAuthSession
builders. The Sync builder still emits nil when settings are missing.
Login and ExtendAuthSession always carry an authoritative value.
The matching client-side decoder lands on feature/session-extend.
* add UserExtendedPeerSession activity event
ExtendAuthSession previously reused UserLoggedInPeer for its audit
record, which conflated two distinct user actions: a full interactive
SSO login (tunnel re-established, network map resync) versus an
in-place deadline refresh (tunnel untouched). Auditors reading the log
couldn't tell which one happened, and downstream dashboards/alerts on
"login" volume were polluted by routine extends.
Adds a dedicated UserExtendedPeerSession Activity (code 125,
"user.peer.session.extend") and switches ExtendPeerSession over to it.
The peer-extend audit trail is now distinguishable from interactive
logins.
* make ExtendAuthSession JWT-retry backoff cancellable
Skip the retry log and 200ms wait on the final attempt, and replace the
uncancellable time.Sleep with a select on time.After/ctx.Done so an
upstream cancellation aborts the wait instead of running it to
completion.
1014 lines
30 KiB
Go
1014 lines
30 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc/codes"
|
|
gstatus "google.golang.org/grpc/status"
|
|
|
|
"github.com/cenkalti/backoff/v4"
|
|
"github.com/google/uuid"
|
|
log "github.com/sirupsen/logrus"
|
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/connectivity"
|
|
|
|
nbgrpc "github.com/netbirdio/netbird/client/grpc"
|
|
"github.com/netbirdio/netbird/client/system"
|
|
"github.com/netbirdio/netbird/encryption"
|
|
"github.com/netbirdio/netbird/shared/management/domain"
|
|
"github.com/netbirdio/netbird/shared/management/proto"
|
|
"github.com/netbirdio/netbird/util/wsproxy"
|
|
)
|
|
|
|
const ConnectTimeout = 10 * time.Second
|
|
|
|
const healthCheckTimeout = 5 * time.Second
|
|
|
|
const (
|
|
// EnvMaxRecvMsgSize overrides the default gRPC max receive message size (4 MB)
|
|
// for the management client connection. Value is in bytes.
|
|
EnvMaxRecvMsgSize = "NB_MANAGEMENT_GRPC_MAX_MSG_SIZE"
|
|
|
|
errMsgMgmtPublicKey = "failed getting Management Service public key: %s"
|
|
errMsgNoMgmtConnection = "no connection to management"
|
|
)
|
|
|
|
// ConnStateNotifier is a wrapper interface of the status recorders
|
|
type ConnStateNotifier interface {
|
|
MarkManagementDisconnected(error)
|
|
MarkManagementConnected()
|
|
}
|
|
|
|
type GrpcClient struct {
|
|
key wgtypes.Key
|
|
realClient proto.ManagementServiceClient
|
|
ctx context.Context
|
|
conn *grpc.ClientConn
|
|
connStateCallback ConnStateNotifier
|
|
connStateCallbackLock sync.RWMutex
|
|
serverURL string
|
|
}
|
|
|
|
type ExposeRequest struct {
|
|
NamePrefix string
|
|
Domain string
|
|
Port uint16
|
|
Protocol int
|
|
Pin string
|
|
Password string
|
|
UserGroups []string
|
|
ListenPort uint16
|
|
}
|
|
|
|
type ExposeResponse struct {
|
|
ServiceName string
|
|
Domain string
|
|
ServiceURL string
|
|
PortAutoAssigned bool
|
|
}
|
|
|
|
// MaxRecvMsgSize returns the configured max gRPC receive message size from
|
|
// the environment, or 0 if unset (which uses the gRPC default of 4 MB).
|
|
func MaxRecvMsgSize() int {
|
|
val := os.Getenv(EnvMaxRecvMsgSize)
|
|
if val == "" {
|
|
return 0
|
|
}
|
|
|
|
size, err := strconv.Atoi(val)
|
|
if err != nil {
|
|
log.Warnf("invalid %s value %q, using default: %v", EnvMaxRecvMsgSize, val, err)
|
|
return 0
|
|
}
|
|
|
|
if size <= 0 {
|
|
log.Warnf("invalid %s value %d, must be positive, using default", EnvMaxRecvMsgSize, size)
|
|
return 0
|
|
}
|
|
|
|
return size
|
|
}
|
|
|
|
// NewClient creates a new client to Management service
|
|
func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsEnabled bool) (*GrpcClient, error) {
|
|
var conn *grpc.ClientConn
|
|
|
|
var extraOpts []grpc.DialOption
|
|
if maxSize := MaxRecvMsgSize(); maxSize > 0 {
|
|
extraOpts = append(extraOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxSize)))
|
|
log.Infof("management gRPC max receive message size set to %d bytes", maxSize)
|
|
}
|
|
|
|
operation := func() error {
|
|
var err error
|
|
conn, err = nbgrpc.CreateConnection(ctx, addr, tlsEnabled, wsproxy.ManagementComponent, extraOpts...)
|
|
if err != nil {
|
|
return fmt.Errorf("create connection: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
err := backoff.Retry(operation, nbgrpc.Backoff(ctx))
|
|
if err != nil {
|
|
log.Errorf("failed creating connection to Management Service: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
realClient := proto.NewManagementServiceClient(conn)
|
|
|
|
return &GrpcClient{
|
|
key: ourPrivateKey,
|
|
realClient: realClient,
|
|
ctx: ctx,
|
|
conn: conn,
|
|
connStateCallbackLock: sync.RWMutex{},
|
|
serverURL: addr,
|
|
}, nil
|
|
}
|
|
|
|
// GetServerURL returns the management server URL
|
|
func (c *GrpcClient) GetServerURL() string {
|
|
return c.serverURL
|
|
}
|
|
|
|
// Close closes connection to the Management Service
|
|
func (c *GrpcClient) Close() error {
|
|
return c.conn.Close()
|
|
}
|
|
|
|
// SetConnStateListener set the ConnStateNotifier
|
|
func (c *GrpcClient) SetConnStateListener(notifier ConnStateNotifier) {
|
|
c.connStateCallbackLock.Lock()
|
|
defer c.connStateCallbackLock.Unlock()
|
|
c.connStateCallback = notifier
|
|
}
|
|
|
|
// defaultBackoff is a basic backoff mechanism for general issues
|
|
func defaultBackoff(ctx context.Context) backoff.BackOff {
|
|
return backoff.WithContext(&backoff.ExponentialBackOff{
|
|
InitialInterval: 800 * time.Millisecond,
|
|
RandomizationFactor: 1,
|
|
Multiplier: 1.7,
|
|
MaxInterval: 10 * time.Second,
|
|
MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
|
|
Stop: backoff.Stop,
|
|
Clock: backoff.SystemClock,
|
|
}, ctx)
|
|
}
|
|
|
|
// ready indicates whether the client is okay and ready to be used
|
|
// for now it just checks whether gRPC connection to the service is ready
|
|
func (c *GrpcClient) ready() bool {
|
|
return c.conn.GetState() == connectivity.Ready || c.conn.GetState() == connectivity.Idle
|
|
}
|
|
|
|
// Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages
|
|
// Blocking request. The result will be sent via msgHandler callback function
|
|
func (c *GrpcClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error {
|
|
return c.withMgmtStream(ctx, func(ctx context.Context, serverPubKey wgtypes.Key) error {
|
|
return c.handleSyncStream(ctx, serverPubKey, sysInfo, msgHandler)
|
|
})
|
|
}
|
|
|
|
// Job wraps the real client's Job endpoint call and takes care of retries and encryption/decryption of messages
|
|
// Blocking request. The result will be sent via msgHandler callback function
|
|
func (c *GrpcClient) Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error {
|
|
return c.withMgmtStream(ctx, func(ctx context.Context, serverPubKey wgtypes.Key) error {
|
|
return c.handleJobStream(ctx, serverPubKey, msgHandler)
|
|
})
|
|
}
|
|
|
|
// withMgmtStream runs a streaming operation against the ManagementService
|
|
// It takes care of retries, connection readiness, and fetching server public key.
|
|
func (c *GrpcClient) withMgmtStream(
|
|
ctx context.Context,
|
|
handler func(ctx context.Context, serverPubKey wgtypes.Key) error,
|
|
) error {
|
|
backOff := defaultBackoff(ctx)
|
|
operation := func() error {
|
|
log.Debugf("management connection state %v", c.conn.GetState())
|
|
connState := c.conn.GetState()
|
|
|
|
if connState == connectivity.Shutdown {
|
|
return backoff.Permanent(fmt.Errorf("connection to management has been shut down"))
|
|
} else if !(connState == connectivity.Ready || connState == connectivity.Idle) {
|
|
c.conn.WaitForStateChange(ctx, connState)
|
|
return fmt.Errorf("connection to management is not ready and in %s state", connState)
|
|
}
|
|
|
|
serverPubKey, err := c.getServerPublicKey()
|
|
if err != nil {
|
|
log.Debugf(errMsgMgmtPublicKey, err)
|
|
return err
|
|
}
|
|
|
|
return handler(ctx, *serverPubKey)
|
|
}
|
|
|
|
err := backoff.Retry(operation, backOff)
|
|
if err != nil {
|
|
log.Warnf("exiting the Management service connection retry loop due to the unrecoverable error: %s", err)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (c *GrpcClient) handleJobStream(
|
|
ctx context.Context,
|
|
serverPubKey wgtypes.Key,
|
|
msgHandler func(msg *proto.JobRequest) *proto.JobResponse,
|
|
) error {
|
|
ctx, cancelStream := context.WithCancel(ctx)
|
|
defer cancelStream()
|
|
|
|
stream, err := c.realClient.Job(ctx)
|
|
if err != nil {
|
|
log.Errorf("failed to open job stream: %v", err)
|
|
return err
|
|
}
|
|
|
|
// Handshake with the server
|
|
if err := c.sendHandshake(ctx, stream, serverPubKey); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Debug("job stream handshake sent successfully")
|
|
|
|
// Main loop: receive, process, respond
|
|
for {
|
|
jobReq, err := c.receiveJobRequest(ctx, stream, serverPubKey)
|
|
if err != nil {
|
|
if ctx.Err() != nil {
|
|
log.Debugf("job stream context has been canceled, this usually indicates shutdown")
|
|
return nil
|
|
}
|
|
if s, ok := gstatus.FromError(err); ok {
|
|
switch s.Code() {
|
|
case codes.PermissionDenied:
|
|
c.notifyDisconnected(err)
|
|
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
|
|
case codes.Unimplemented:
|
|
log.Warn("Job feature is not supported by the current management server version. " +
|
|
"Please update the management service to use this feature.")
|
|
return nil
|
|
}
|
|
}
|
|
log.Warnf("job stream disconnected, will retry silently. Reason: %v", err)
|
|
return err
|
|
}
|
|
|
|
if jobReq == nil || len(jobReq.ID) == 0 {
|
|
log.Debug("received unknown or empty job request, skipping")
|
|
continue
|
|
}
|
|
|
|
log.Infof("received a new job from the management server (ID: %s)", jobReq.ID)
|
|
jobResp := c.processJobRequest(ctx, jobReq, msgHandler)
|
|
if err := c.sendJobResponse(ctx, stream, serverPubKey, jobResp); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendHandshake sends the initial handshake message
|
|
func (c *GrpcClient) sendHandshake(ctx context.Context, stream proto.ManagementService_JobClient, serverPubKey wgtypes.Key) error {
|
|
handshakeReq := &proto.JobRequest{
|
|
ID: []byte(uuid.New().String()),
|
|
}
|
|
encHello, err := encryption.EncryptMessage(serverPubKey, c.key, handshakeReq)
|
|
if err != nil {
|
|
log.Errorf("failed to encrypt handshake message: %v", err)
|
|
return err
|
|
}
|
|
return stream.Send(&proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encHello,
|
|
})
|
|
}
|
|
|
|
// receiveJobRequest waits for and decrypts a job request
|
|
func (c *GrpcClient) receiveJobRequest(
|
|
ctx context.Context,
|
|
stream proto.ManagementService_JobClient,
|
|
serverPubKey wgtypes.Key,
|
|
) (*proto.JobRequest, error) {
|
|
encryptedMsg, err := stream.Recv()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
jobReq := &proto.JobRequest{}
|
|
if err := encryption.DecryptMessage(serverPubKey, c.key, encryptedMsg.Body, jobReq); err != nil {
|
|
log.Warnf("failed to decrypt job request: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
return jobReq, nil
|
|
}
|
|
|
|
// processJobRequest executes the handler and ensures a valid response
|
|
func (c *GrpcClient) processJobRequest(
|
|
ctx context.Context,
|
|
jobReq *proto.JobRequest,
|
|
msgHandler func(msg *proto.JobRequest) *proto.JobResponse,
|
|
) *proto.JobResponse {
|
|
jobResp := msgHandler(jobReq)
|
|
if jobResp == nil {
|
|
jobResp = &proto.JobResponse{
|
|
ID: jobReq.ID,
|
|
Status: proto.JobStatus_failed,
|
|
Reason: []byte("handler returned nil response"),
|
|
}
|
|
log.Warnf("job handler returned nil for job %s", string(jobReq.ID))
|
|
}
|
|
return jobResp
|
|
}
|
|
|
|
// sendJobResponse encrypts and sends a job response
|
|
func (c *GrpcClient) sendJobResponse(
|
|
ctx context.Context,
|
|
stream proto.ManagementService_JobClient,
|
|
serverPubKey wgtypes.Key,
|
|
resp *proto.JobResponse,
|
|
) error {
|
|
encResp, err := encryption.EncryptMessage(serverPubKey, c.key, resp)
|
|
if err != nil {
|
|
log.Errorf("failed to encrypt job response for job %s: %v", string(resp.ID), err)
|
|
return err
|
|
}
|
|
|
|
if err := stream.Send(&proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encResp,
|
|
}); err != nil {
|
|
log.Errorf("failed to send job response for job %s: %v", string(resp.ID), err)
|
|
return err
|
|
}
|
|
|
|
log.Infof("job response sent for job %s (status: %s)", string(resp.ID), resp.Status.String())
|
|
return nil
|
|
}
|
|
|
|
func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error {
|
|
ctx, cancelStream := context.WithCancel(ctx)
|
|
defer cancelStream()
|
|
|
|
stream, err := c.connectToSyncStream(ctx, serverPubKey, sysInfo)
|
|
if err != nil {
|
|
log.Debugf("failed to open Management Service stream: %s", err)
|
|
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
|
|
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
|
|
}
|
|
return err
|
|
}
|
|
|
|
log.Infof("connected to the Management Service stream")
|
|
c.notifyConnected()
|
|
|
|
// blocking until error
|
|
err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler)
|
|
if err != nil {
|
|
c.notifyDisconnected(err)
|
|
if ctx.Err() != nil {
|
|
log.Debugf("management connection context has been canceled, this usually indicates shutdown")
|
|
return nil
|
|
}
|
|
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
|
|
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
|
|
}
|
|
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetNetworkMap return with the network map
|
|
func (c *GrpcClient) GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, error) {
|
|
serverPubKey, err := c.getServerPublicKey()
|
|
if err != nil {
|
|
log.Debugf("failed getting Management Service public key: %s", err)
|
|
return nil, err
|
|
}
|
|
|
|
ctx, cancelStream := context.WithCancel(c.ctx)
|
|
defer cancelStream()
|
|
stream, err := c.connectToSyncStream(ctx, *serverPubKey, sysInfo)
|
|
if err != nil {
|
|
log.Debugf("failed to open Management Service stream: %s", err)
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
_ = stream.CloseSend()
|
|
}()
|
|
|
|
update, err := stream.Recv()
|
|
if err == io.EOF {
|
|
log.Debugf("Management stream has been closed by server: %s", err)
|
|
return nil, err
|
|
}
|
|
if err != nil {
|
|
log.Debugf("disconnected from Management Service sync stream: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
decryptedResp := &proto.SyncResponse{}
|
|
err = encryption.DecryptMessage(*serverPubKey, c.key, update.Body, decryptedResp)
|
|
if err != nil {
|
|
log.Errorf("failed decrypting update message from Management Service: %s", err)
|
|
return nil, err
|
|
}
|
|
|
|
if decryptedResp.GetNetworkMap() == nil {
|
|
return nil, fmt.Errorf("invalid msg, required network map")
|
|
}
|
|
|
|
return decryptedResp.GetNetworkMap(), nil
|
|
}
|
|
|
|
func (c *GrpcClient) connectToSyncStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info) (proto.ManagementService_SyncClient, error) {
|
|
req := &proto.SyncRequest{Meta: infoToMetaData(sysInfo)}
|
|
|
|
myPrivateKey := c.key
|
|
myPublicKey := myPrivateKey.PublicKey()
|
|
|
|
encryptedReq, err := encryption.EncryptMessage(serverPubKey, myPrivateKey, req)
|
|
if err != nil {
|
|
log.Errorf("failed encrypting message: %s", err)
|
|
return nil, err
|
|
}
|
|
syncReq := &proto.EncryptedMessage{WgPubKey: myPublicKey.String(), Body: encryptedReq}
|
|
sync, err := c.realClient.Sync(ctx, syncReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return sync, nil
|
|
}
|
|
|
|
func (c *GrpcClient) receiveUpdatesEvents(stream proto.ManagementService_SyncClient, serverPubKey wgtypes.Key, msgHandler func(msg *proto.SyncResponse) error) error {
|
|
for {
|
|
update, err := stream.Recv()
|
|
if err == io.EOF {
|
|
log.Debugf("Management stream has been closed by server: %s", err)
|
|
return err
|
|
}
|
|
if err != nil {
|
|
log.Debugf("disconnected from Management Service sync stream: %v", err)
|
|
return err
|
|
}
|
|
|
|
log.Debugf("got an update message from Management Service")
|
|
decryptedResp := &proto.SyncResponse{}
|
|
err = encryption.DecryptMessage(serverPubKey, c.key, update.Body, decryptedResp)
|
|
if err != nil {
|
|
log.Errorf("failed decrypting update message from Management Service: %s", err)
|
|
return err
|
|
}
|
|
|
|
if err := msgHandler(decryptedResp); err != nil {
|
|
log.Errorf("failed handling an update message received from Management Service: %v", err.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
// HealthCheck actively probes the management server and returns an error if unreachable.
|
|
// Used to validate connectivity before committing configuration changes.
|
|
func (c *GrpcClient) HealthCheck() error {
|
|
if !c.ready() {
|
|
return errors.New(errMsgNoMgmtConnection)
|
|
}
|
|
|
|
_, err := c.getServerPublicKey()
|
|
return err
|
|
}
|
|
|
|
// getServerPublicKey fetches the server's WireGuard public key.
|
|
func (c *GrpcClient) getServerPublicKey() (*wgtypes.Key, error) {
|
|
mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
|
|
defer cancel()
|
|
resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed getting Management Service public key: %w", err)
|
|
}
|
|
|
|
serverKey, err := wgtypes.ParseKey(resp.Key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &serverKey, nil
|
|
}
|
|
|
|
// IsHealthy returns the current connection status without blocking.
|
|
// Used by the engine to monitor connectivity in the background.
|
|
func (c *GrpcClient) IsHealthy() bool {
|
|
switch c.conn.GetState() {
|
|
case connectivity.TransientFailure:
|
|
return false
|
|
case connectivity.Connecting:
|
|
return true
|
|
case connectivity.Shutdown:
|
|
return true
|
|
case connectivity.Idle:
|
|
case connectivity.Ready:
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(c.ctx, healthCheckTimeout)
|
|
defer cancel()
|
|
|
|
_, err := c.realClient.GetServerKey(ctx, &proto.Empty{})
|
|
if err != nil {
|
|
c.notifyDisconnected(err)
|
|
log.Warnf("health check returned: %s", err)
|
|
return false
|
|
}
|
|
c.notifyConnected()
|
|
return true
|
|
}
|
|
|
|
func (c *GrpcClient) login(req *proto.LoginRequest) (*proto.LoginResponse, error) {
|
|
if !c.ready() {
|
|
return nil, errors.New(errMsgNoMgmtConnection)
|
|
}
|
|
|
|
serverKey, err := c.getServerPublicKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
loginReq, err := encryption.EncryptMessage(*serverKey, c.key, req)
|
|
if err != nil {
|
|
log.Errorf("failed to encrypt message: %s", err)
|
|
return nil, err
|
|
}
|
|
|
|
var resp *proto.EncryptedMessage
|
|
operation := func() error {
|
|
mgmCtx, cancel := context.WithTimeout(context.Background(), ConnectTimeout)
|
|
defer cancel()
|
|
|
|
var err error
|
|
resp, err = c.realClient.Login(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: loginReq,
|
|
})
|
|
if err != nil {
|
|
// retry only on context canceled
|
|
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.Canceled {
|
|
return err
|
|
}
|
|
return backoff.Permanent(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
err = backoff.Retry(operation, nbgrpc.Backoff(c.ctx))
|
|
if err != nil {
|
|
log.Errorf("failed to login to Management Service: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
loginResp := &proto.LoginResponse{}
|
|
err = encryption.DecryptMessage(*serverKey, c.key, resp.Body, loginResp)
|
|
if err != nil {
|
|
log.Errorf("failed to decrypt login response: %s", err)
|
|
return nil, err
|
|
}
|
|
|
|
return loginResp, nil
|
|
}
|
|
|
|
// Register registers peer on Management Server. It actually calls a Login endpoint with a provided setup key
|
|
// Takes care of encrypting and decrypting messages.
|
|
// This method will also collect system info and send it with the request (e.g. hostname, os, etc)
|
|
func (c *GrpcClient) Register(setupKey string, jwtToken string, sysInfo *system.Info, pubSSHKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error) {
|
|
keys := &proto.PeerKeys{
|
|
SshPubKey: pubSSHKey,
|
|
WgPubKey: []byte(c.key.PublicKey().String()),
|
|
}
|
|
return c.login(&proto.LoginRequest{SetupKey: setupKey, Meta: infoToMetaData(sysInfo), JwtToken: jwtToken, PeerKeys: keys, DnsLabels: dnsLabels.ToPunycodeList()})
|
|
}
|
|
|
|
// Login attempts login to Management Server. Takes care of encrypting and decrypting messages.
|
|
func (c *GrpcClient) Login(sysInfo *system.Info, pubSSHKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error) {
|
|
keys := &proto.PeerKeys{
|
|
SshPubKey: pubSSHKey,
|
|
WgPubKey: []byte(c.key.PublicKey().String()),
|
|
}
|
|
return c.login(&proto.LoginRequest{Meta: infoToMetaData(sysInfo), PeerKeys: keys, DnsLabels: dnsLabels.ToPunycodeList()})
|
|
}
|
|
|
|
// ExtendAuthSession refreshes the peer's SSO session deadline on the management
|
|
// server using a freshly issued JWT. The tunnel is untouched: no network map
|
|
// sync, no peer reconnect. Returns the new absolute UTC deadline (zero time
|
|
// when the server reports the field empty).
|
|
func (c *GrpcClient) ExtendAuthSession(sysInfo *system.Info, jwtToken string) (*proto.ExtendAuthSessionResponse, error) {
|
|
if !c.ready() {
|
|
return nil, errors.New(errMsgNoMgmtConnection)
|
|
}
|
|
|
|
serverKey, err := c.getServerPublicKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
reqBody, err := encryption.EncryptMessage(*serverKey, c.key, &proto.ExtendAuthSessionRequest{
|
|
JwtToken: jwtToken,
|
|
Meta: infoToMetaData(sysInfo),
|
|
})
|
|
if err != nil {
|
|
log.Errorf("failed to encrypt extend auth session message: %s", err)
|
|
return nil, err
|
|
}
|
|
|
|
var resp *proto.EncryptedMessage
|
|
operation := func() error {
|
|
mgmCtx, cancel := context.WithTimeout(context.Background(), ConnectTimeout)
|
|
defer cancel()
|
|
|
|
var err error
|
|
resp, err = c.realClient.ExtendAuthSession(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: reqBody,
|
|
})
|
|
if err != nil {
|
|
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.Canceled {
|
|
return err
|
|
}
|
|
return backoff.Permanent(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if err := backoff.Retry(operation, nbgrpc.Backoff(c.ctx)); err != nil {
|
|
log.Errorf("failed to extend auth session on Management Service: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
out := &proto.ExtendAuthSessionResponse{}
|
|
if err := encryption.DecryptMessage(*serverKey, c.key, resp.Body, out); err != nil {
|
|
log.Errorf("failed to decrypt extend auth session response: %s", err)
|
|
return nil, err
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// GetDeviceAuthorizationFlow returns a device authorization flow information.
|
|
// It also takes care of encrypting and decrypting messages.
|
|
func (c *GrpcClient) GetDeviceAuthorizationFlow() (*proto.DeviceAuthorizationFlow, error) {
|
|
if !c.ready() {
|
|
return nil, fmt.Errorf("no connection to management in order to get device authorization flow")
|
|
}
|
|
|
|
serverKey, err := c.getServerPublicKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mgmCtx, cancel := context.WithTimeout(c.ctx, time.Second*2)
|
|
defer cancel()
|
|
|
|
message := &proto.DeviceAuthorizationFlowRequest{}
|
|
encryptedMSG, err := encryption.EncryptMessage(*serverKey, c.key, message)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := c.realClient.GetDeviceAuthorizationFlow(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encryptedMSG},
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
flowInfoResp := &proto.DeviceAuthorizationFlow{}
|
|
err = encryption.DecryptMessage(*serverKey, c.key, resp.Body, flowInfoResp)
|
|
if err != nil {
|
|
errWithMSG := fmt.Errorf("failed to decrypt device authorization flow message: %s", err)
|
|
log.Error(errWithMSG)
|
|
return nil, errWithMSG
|
|
}
|
|
|
|
return flowInfoResp, nil
|
|
}
|
|
|
|
// GetPKCEAuthorizationFlow returns a pkce authorization flow information.
|
|
// It also takes care of encrypting and decrypting messages.
|
|
func (c *GrpcClient) GetPKCEAuthorizationFlow() (*proto.PKCEAuthorizationFlow, error) {
|
|
if !c.ready() {
|
|
return nil, fmt.Errorf("no connection to management in order to get pkce authorization flow")
|
|
}
|
|
|
|
serverKey, err := c.getServerPublicKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mgmCtx, cancel := context.WithTimeout(c.ctx, time.Second*2)
|
|
defer cancel()
|
|
|
|
message := &proto.PKCEAuthorizationFlowRequest{}
|
|
encryptedMSG, err := encryption.EncryptMessage(*serverKey, c.key, message)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := c.realClient.GetPKCEAuthorizationFlow(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encryptedMSG,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
flowInfoResp := &proto.PKCEAuthorizationFlow{}
|
|
err = encryption.DecryptMessage(*serverKey, c.key, resp.Body, flowInfoResp)
|
|
if err != nil {
|
|
errWithMSG := fmt.Errorf("failed to decrypt pkce authorization flow message: %s", err)
|
|
log.Error(errWithMSG)
|
|
return nil, errWithMSG
|
|
}
|
|
|
|
return flowInfoResp, nil
|
|
}
|
|
|
|
// SyncMeta sends updated system metadata to the Management Service.
|
|
// It should be used if there is changes on peer posture check after initial sync.
|
|
func (c *GrpcClient) SyncMeta(sysInfo *system.Info) error {
|
|
if !c.ready() {
|
|
return errors.New(errMsgNoMgmtConnection)
|
|
}
|
|
|
|
serverPubKey, err := c.getServerPublicKey()
|
|
if err != nil {
|
|
log.Debugf(errMsgMgmtPublicKey, err)
|
|
return err
|
|
}
|
|
|
|
syncMetaReq, err := encryption.EncryptMessage(*serverPubKey, c.key, &proto.SyncMetaRequest{Meta: infoToMetaData(sysInfo)})
|
|
if err != nil {
|
|
log.Errorf("failed to encrypt message: %s", err)
|
|
return err
|
|
}
|
|
|
|
mgmCtx, cancel := context.WithTimeout(c.ctx, ConnectTimeout)
|
|
defer cancel()
|
|
|
|
_, err = c.realClient.SyncMeta(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: syncMetaReq,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (c *GrpcClient) notifyDisconnected(err error) {
|
|
c.connStateCallbackLock.RLock()
|
|
defer c.connStateCallbackLock.RUnlock()
|
|
|
|
if c.connStateCallback == nil {
|
|
return
|
|
}
|
|
c.connStateCallback.MarkManagementDisconnected(err)
|
|
}
|
|
|
|
func (c *GrpcClient) notifyConnected() {
|
|
c.connStateCallbackLock.RLock()
|
|
defer c.connStateCallbackLock.RUnlock()
|
|
|
|
if c.connStateCallback == nil {
|
|
return
|
|
}
|
|
c.connStateCallback.MarkManagementConnected()
|
|
}
|
|
|
|
func (c *GrpcClient) Logout() error {
|
|
serverKey, err := c.getServerPublicKey()
|
|
if err != nil {
|
|
return fmt.Errorf("get server public key: %w", err)
|
|
}
|
|
|
|
mgmCtx, cancel := context.WithTimeout(c.ctx, time.Second*15)
|
|
defer cancel()
|
|
|
|
message := &proto.Empty{}
|
|
encryptedMSG, err := encryption.EncryptMessage(*serverKey, c.key, message)
|
|
if err != nil {
|
|
return fmt.Errorf("encrypt logout message: %w", err)
|
|
}
|
|
|
|
_, err = c.realClient.Logout(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encryptedMSG,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("logout: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CreateExpose calls the management server to create a new expose service.
|
|
func (c *GrpcClient) CreateExpose(ctx context.Context, req ExposeRequest) (*ExposeResponse, error) {
|
|
serverPubKey, err := c.getServerPublicKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
protoReq, err := toProtoExposeServiceRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
encReq, err := encryption.EncryptMessage(*serverPubKey, c.key, protoReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("encrypt create expose request: %w", err)
|
|
}
|
|
|
|
mgmCtx, cancel := context.WithTimeout(ctx, ConnectTimeout)
|
|
defer cancel()
|
|
|
|
resp, err := c.realClient.CreateExpose(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encReq,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
exposeResp := &proto.ExposeServiceResponse{}
|
|
if err := encryption.DecryptMessage(*serverPubKey, c.key, resp.Body, exposeResp); err != nil {
|
|
return nil, fmt.Errorf("decrypt create expose response: %w", err)
|
|
}
|
|
|
|
return fromProtoExposeResponse(exposeResp), nil
|
|
}
|
|
|
|
// RenewExpose extends the TTL of an active expose session on the management server.
|
|
func (c *GrpcClient) RenewExpose(ctx context.Context, domain string) error {
|
|
serverPubKey, err := c.getServerPublicKey()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req := &proto.RenewExposeRequest{Domain: domain}
|
|
encReq, err := encryption.EncryptMessage(*serverPubKey, c.key, req)
|
|
if err != nil {
|
|
return fmt.Errorf("encrypt renew expose request: %w", err)
|
|
}
|
|
|
|
mgmCtx, cancel := context.WithTimeout(ctx, ConnectTimeout)
|
|
defer cancel()
|
|
|
|
_, err = c.realClient.RenewExpose(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encReq,
|
|
})
|
|
return err
|
|
}
|
|
|
|
// StopExpose terminates an active expose session on the management server.
|
|
func (c *GrpcClient) StopExpose(ctx context.Context, domain string) error {
|
|
serverPubKey, err := c.getServerPublicKey()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req := &proto.StopExposeRequest{Domain: domain}
|
|
encReq, err := encryption.EncryptMessage(*serverPubKey, c.key, req)
|
|
if err != nil {
|
|
return fmt.Errorf("encrypt stop expose request: %w", err)
|
|
}
|
|
|
|
mgmCtx, cancel := context.WithTimeout(ctx, ConnectTimeout)
|
|
defer cancel()
|
|
|
|
_, err = c.realClient.StopExpose(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encReq,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func fromProtoExposeResponse(resp *proto.ExposeServiceResponse) *ExposeResponse {
|
|
return &ExposeResponse{
|
|
ServiceName: resp.ServiceName,
|
|
Domain: resp.Domain,
|
|
ServiceURL: resp.ServiceUrl,
|
|
PortAutoAssigned: resp.PortAutoAssigned,
|
|
}
|
|
}
|
|
|
|
func toProtoExposeServiceRequest(req ExposeRequest) (*proto.ExposeServiceRequest, error) {
|
|
var protocol proto.ExposeProtocol
|
|
|
|
switch req.Protocol {
|
|
case int(proto.ExposeProtocol_EXPOSE_HTTP):
|
|
protocol = proto.ExposeProtocol_EXPOSE_HTTP
|
|
case int(proto.ExposeProtocol_EXPOSE_HTTPS):
|
|
protocol = proto.ExposeProtocol_EXPOSE_HTTPS
|
|
case int(proto.ExposeProtocol_EXPOSE_TCP):
|
|
protocol = proto.ExposeProtocol_EXPOSE_TCP
|
|
case int(proto.ExposeProtocol_EXPOSE_UDP):
|
|
protocol = proto.ExposeProtocol_EXPOSE_UDP
|
|
case int(proto.ExposeProtocol_EXPOSE_TLS):
|
|
protocol = proto.ExposeProtocol_EXPOSE_TLS
|
|
default:
|
|
return nil, fmt.Errorf("invalid expose protocol: %d", req.Protocol)
|
|
}
|
|
|
|
return &proto.ExposeServiceRequest{
|
|
NamePrefix: req.NamePrefix,
|
|
Domain: req.Domain,
|
|
Port: uint32(req.Port),
|
|
Protocol: protocol,
|
|
Pin: req.Pin,
|
|
Password: req.Password,
|
|
UserGroups: req.UserGroups,
|
|
ListenPort: uint32(req.ListenPort),
|
|
}, nil
|
|
}
|
|
|
|
func infoToMetaData(info *system.Info) *proto.PeerSystemMeta {
|
|
if info == nil {
|
|
return nil
|
|
}
|
|
|
|
addresses := make([]*proto.NetworkAddress, 0, len(info.NetworkAddresses))
|
|
for _, addr := range info.NetworkAddresses {
|
|
addresses = append(addresses, &proto.NetworkAddress{
|
|
NetIP: addr.NetIP.String(),
|
|
Mac: addr.Mac,
|
|
})
|
|
}
|
|
|
|
files := make([]*proto.File, 0, len(info.Files))
|
|
for _, file := range info.Files {
|
|
files = append(files, &proto.File{
|
|
Path: file.Path,
|
|
Exist: file.Exist,
|
|
ProcessIsRunning: file.ProcessIsRunning,
|
|
})
|
|
}
|
|
|
|
return &proto.PeerSystemMeta{
|
|
Hostname: info.Hostname,
|
|
GoOS: info.GoOS,
|
|
OS: info.OS,
|
|
Core: info.OSVersion,
|
|
OSVersion: info.OSVersion,
|
|
Platform: info.Platform,
|
|
Kernel: info.Kernel,
|
|
NetbirdVersion: info.NetbirdVersion,
|
|
UiVersion: info.UIVersion,
|
|
KernelVersion: info.KernelVersion,
|
|
NetworkAddresses: addresses,
|
|
SysSerialNumber: info.SystemSerialNumber,
|
|
SysManufacturer: info.SystemManufacturer,
|
|
SysProductName: info.SystemProductName,
|
|
Environment: &proto.Environment{
|
|
Cloud: info.Environment.Cloud,
|
|
Platform: info.Environment.Platform,
|
|
},
|
|
Files: files,
|
|
|
|
Flags: &proto.Flags{
|
|
RosenpassEnabled: info.RosenpassEnabled,
|
|
RosenpassPermissive: info.RosenpassPermissive,
|
|
ServerSSHAllowed: info.ServerSSHAllowed,
|
|
|
|
DisableClientRoutes: info.DisableClientRoutes,
|
|
DisableServerRoutes: info.DisableServerRoutes,
|
|
DisableDNS: info.DisableDNS,
|
|
DisableFirewall: info.DisableFirewall,
|
|
BlockLANAccess: info.BlockLANAccess,
|
|
BlockInbound: info.BlockInbound,
|
|
DisableIPv6: info.DisableIPv6,
|
|
|
|
LazyConnectionEnabled: info.LazyConnectionEnabled,
|
|
},
|
|
|
|
Capabilities: peerCapabilities(*info),
|
|
}
|
|
}
|
|
|
|
// peerCapabilities returns the capabilities this client supports.
|
|
func peerCapabilities(info system.Info) []proto.PeerCapability {
|
|
caps := []proto.PeerCapability{
|
|
proto.PeerCapability_PeerCapabilitySourcePrefixes,
|
|
}
|
|
if !info.DisableIPv6 {
|
|
caps = append(caps, proto.PeerCapability_PeerCapabilityIPv6Overlay)
|
|
}
|
|
return caps
|
|
}
|