package client import ( "context" "errors" "fmt" "io" "os" "strconv" "sync" "time" "google.golang.org/grpc/codes" gstatus "google.golang.org/grpc/status" "github.com/cenkalti/backoff/v4" "github.com/google/uuid" log "github.com/sirupsen/logrus" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" nbgrpc "github.com/netbirdio/netbird/client/grpc" "github.com/netbirdio/netbird/client/system" "github.com/netbirdio/netbird/encryption" "github.com/netbirdio/netbird/shared/management/domain" "github.com/netbirdio/netbird/shared/management/proto" "github.com/netbirdio/netbird/util/wsproxy" ) const ConnectTimeout = 10 * time.Second const ( // EnvMaxRecvMsgSize overrides the default gRPC max receive message size (4 MB) // for the management client connection. Value is in bytes. EnvMaxRecvMsgSize = "NB_MANAGEMENT_GRPC_MAX_MSG_SIZE" errMsgMgmtPublicKey = "failed getting Management Service public key: %s" errMsgNoMgmtConnection = "no connection to management" ) // ConnStateNotifier is a wrapper interface of the status recorders type ConnStateNotifier interface { MarkManagementDisconnected(error) MarkManagementConnected() } type GrpcClient struct { key wgtypes.Key realClient proto.ManagementServiceClient ctx context.Context conn *grpc.ClientConn connStateCallback ConnStateNotifier connStateCallbackLock sync.RWMutex serverURL string } type ExposeRequest struct { NamePrefix string Domain string Port uint16 Protocol int Pin string Password string UserGroups []string ListenPort uint16 } type ExposeResponse struct { ServiceName string Domain string ServiceURL string PortAutoAssigned bool } // MaxRecvMsgSize returns the configured max gRPC receive message size from // the environment, or 0 if unset (which uses the gRPC default of 4 MB). func MaxRecvMsgSize() int { val := os.Getenv(EnvMaxRecvMsgSize) if val == "" { return 0 } size, err := strconv.Atoi(val) if err != nil { log.Warnf("invalid %s value %q, using default: %v", EnvMaxRecvMsgSize, val, err) return 0 } if size <= 0 { log.Warnf("invalid %s value %d, must be positive, using default", EnvMaxRecvMsgSize, size) return 0 } return size } // NewClient creates a new client to Management service func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsEnabled bool) (*GrpcClient, error) { var conn *grpc.ClientConn var extraOpts []grpc.DialOption if maxSize := MaxRecvMsgSize(); maxSize > 0 { extraOpts = append(extraOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxSize))) log.Infof("management gRPC max receive message size set to %d bytes", maxSize) } operation := func() error { var err error conn, err = nbgrpc.CreateConnection(ctx, addr, tlsEnabled, wsproxy.ManagementComponent, extraOpts...) if err != nil { return fmt.Errorf("create connection: %w", err) } return nil } err := backoff.Retry(operation, nbgrpc.Backoff(ctx)) if err != nil { log.Errorf("failed creating connection to Management Service: %v", err) return nil, err } realClient := proto.NewManagementServiceClient(conn) return &GrpcClient{ key: ourPrivateKey, realClient: realClient, ctx: ctx, conn: conn, connStateCallbackLock: sync.RWMutex{}, serverURL: addr, }, nil } // GetServerURL returns the management server URL func (c *GrpcClient) GetServerURL() string { return c.serverURL } // Close closes connection to the Management Service func (c *GrpcClient) Close() error { return c.conn.Close() } // SetConnStateListener set the ConnStateNotifier func (c *GrpcClient) SetConnStateListener(notifier ConnStateNotifier) { c.connStateCallbackLock.Lock() defer c.connStateCallbackLock.Unlock() c.connStateCallback = notifier } // defaultBackoff is a basic backoff mechanism for general issues func defaultBackoff(ctx context.Context) backoff.BackOff { return backoff.WithContext(&backoff.ExponentialBackOff{ InitialInterval: 800 * time.Millisecond, RandomizationFactor: 1, Multiplier: 1.7, MaxInterval: 10 * time.Second, MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months Stop: backoff.Stop, Clock: backoff.SystemClock, }, ctx) } // ready indicates whether the client is okay and ready to be used // for now it just checks whether gRPC connection to the service is ready func (c *GrpcClient) ready() bool { return c.conn.GetState() == connectivity.Ready || c.conn.GetState() == connectivity.Idle } // Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages // Blocking request. The result will be sent via msgHandler callback function func (c *GrpcClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error { return c.withMgmtStream(ctx, func(ctx context.Context, serverPubKey wgtypes.Key) error { return c.handleSyncStream(ctx, serverPubKey, sysInfo, msgHandler) }) } // Job wraps the real client's Job endpoint call and takes care of retries and encryption/decryption of messages // Blocking request. The result will be sent via msgHandler callback function func (c *GrpcClient) Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error { return c.withMgmtStream(ctx, func(ctx context.Context, serverPubKey wgtypes.Key) error { return c.handleJobStream(ctx, serverPubKey, msgHandler) }) } // withMgmtStream runs a streaming operation against the ManagementService // It takes care of retries, connection readiness, and fetching server public key. func (c *GrpcClient) withMgmtStream( ctx context.Context, handler func(ctx context.Context, serverPubKey wgtypes.Key) error, ) error { backOff := defaultBackoff(ctx) operation := func() error { log.Debugf("management connection state %v", c.conn.GetState()) connState := c.conn.GetState() if connState == connectivity.Shutdown { return backoff.Permanent(fmt.Errorf("connection to management has been shut down")) } else if !(connState == connectivity.Ready || connState == connectivity.Idle) { c.conn.WaitForStateChange(ctx, connState) return fmt.Errorf("connection to management is not ready and in %s state", connState) } serverPubKey, err := c.getServerPublicKey() if err != nil { log.Debugf(errMsgMgmtPublicKey, err) return err } return handler(ctx, *serverPubKey) } err := backoff.Retry(operation, backOff) if err != nil { log.Warnf("exiting the Management service connection retry loop due to the unrecoverable error: %s", err) } return err } func (c *GrpcClient) handleJobStream( ctx context.Context, serverPubKey wgtypes.Key, msgHandler func(msg *proto.JobRequest) *proto.JobResponse, ) error { ctx, cancelStream := context.WithCancel(ctx) defer cancelStream() stream, err := c.realClient.Job(ctx) if err != nil { log.Errorf("failed to open job stream: %v", err) return err } // Handshake with the server if err := c.sendHandshake(ctx, stream, serverPubKey); err != nil { return err } log.Debug("job stream handshake sent successfully") // Main loop: receive, process, respond for { jobReq, err := c.receiveJobRequest(ctx, stream, serverPubKey) if err != nil { if s, ok := gstatus.FromError(err); ok { switch s.Code() { case codes.PermissionDenied: c.notifyDisconnected(err) return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer case codes.Canceled: log.Debugf("management connection context has been canceled, this usually indicates shutdown") return err case codes.Unimplemented: log.Warn("Job feature is not supported by the current management server version. " + "Please update the management service to use this feature.") return nil default: c.notifyDisconnected(err) log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err) return err } } else { // non-gRPC error c.notifyDisconnected(err) log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err) return err } } if jobReq == nil || len(jobReq.ID) == 0 { log.Debug("received unknown or empty job request, skipping") continue } log.Infof("received a new job from the management server (ID: %s)", jobReq.ID) jobResp := c.processJobRequest(ctx, jobReq, msgHandler) if err := c.sendJobResponse(ctx, stream, serverPubKey, jobResp); err != nil { return err } } } // sendHandshake sends the initial handshake message func (c *GrpcClient) sendHandshake(ctx context.Context, stream proto.ManagementService_JobClient, serverPubKey wgtypes.Key) error { handshakeReq := &proto.JobRequest{ ID: []byte(uuid.New().String()), } encHello, err := encryption.EncryptMessage(serverPubKey, c.key, handshakeReq) if err != nil { log.Errorf("failed to encrypt handshake message: %v", err) return err } return stream.Send(&proto.EncryptedMessage{ WgPubKey: c.key.PublicKey().String(), Body: encHello, }) } // receiveJobRequest waits for and decrypts a job request func (c *GrpcClient) receiveJobRequest( ctx context.Context, stream proto.ManagementService_JobClient, serverPubKey wgtypes.Key, ) (*proto.JobRequest, error) { encryptedMsg, err := stream.Recv() if err != nil { return nil, err } jobReq := &proto.JobRequest{} if err := encryption.DecryptMessage(serverPubKey, c.key, encryptedMsg.Body, jobReq); err != nil { log.Warnf("failed to decrypt job request: %v", err) return nil, err } return jobReq, nil } // processJobRequest executes the handler and ensures a valid response func (c *GrpcClient) processJobRequest( ctx context.Context, jobReq *proto.JobRequest, msgHandler func(msg *proto.JobRequest) *proto.JobResponse, ) *proto.JobResponse { jobResp := msgHandler(jobReq) if jobResp == nil { jobResp = &proto.JobResponse{ ID: jobReq.ID, Status: proto.JobStatus_failed, Reason: []byte("handler returned nil response"), } log.Warnf("job handler returned nil for job %s", string(jobReq.ID)) } return jobResp } // sendJobResponse encrypts and sends a job response func (c *GrpcClient) sendJobResponse( ctx context.Context, stream proto.ManagementService_JobClient, serverPubKey wgtypes.Key, resp *proto.JobResponse, ) error { encResp, err := encryption.EncryptMessage(serverPubKey, c.key, resp) if err != nil { log.Errorf("failed to encrypt job response for job %s: %v", string(resp.ID), err) return err } if err := stream.Send(&proto.EncryptedMessage{ WgPubKey: c.key.PublicKey().String(), Body: encResp, }); err != nil { log.Errorf("failed to send job response for job %s: %v", string(resp.ID), err) return err } log.Infof("job response sent for job %s (status: %s)", string(resp.ID), resp.Status.String()) return nil } func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error { ctx, cancelStream := context.WithCancel(ctx) defer cancelStream() stream, err := c.connectToSyncStream(ctx, serverPubKey, sysInfo) if err != nil { log.Debugf("failed to open Management Service stream: %s", err) if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied { return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer } return err } log.Infof("connected to the Management Service stream") c.notifyConnected() // blocking until error err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler) if err != nil { c.notifyDisconnected(err) if s, ok := gstatus.FromError(err); ok { switch s.Code() { case codes.PermissionDenied: return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer case codes.Canceled: log.Debugf("management connection context has been canceled, this usually indicates shutdown") return nil default: log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err) return err } } else { // non-gRPC error log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err) return err } } return nil } // GetNetworkMap return with the network map func (c *GrpcClient) GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, error) { serverPubKey, err := c.getServerPublicKey() if err != nil { log.Debugf("failed getting Management Service public key: %s", err) return nil, err } ctx, cancelStream := context.WithCancel(c.ctx) defer cancelStream() stream, err := c.connectToSyncStream(ctx, *serverPubKey, sysInfo) if err != nil { log.Debugf("failed to open Management Service stream: %s", err) return nil, err } defer func() { _ = stream.CloseSend() }() update, err := stream.Recv() if err == io.EOF { log.Debugf("Management stream has been closed by server: %s", err) return nil, err } if err != nil { log.Debugf("disconnected from Management Service sync stream: %v", err) return nil, err } decryptedResp := &proto.SyncResponse{} err = encryption.DecryptMessage(*serverPubKey, c.key, update.Body, decryptedResp) if err != nil { log.Errorf("failed decrypting update message from Management Service: %s", err) return nil, err } if decryptedResp.GetNetworkMap() == nil { return nil, fmt.Errorf("invalid msg, required network map") } return decryptedResp.GetNetworkMap(), nil } func (c *GrpcClient) connectToSyncStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info) (proto.ManagementService_SyncClient, error) { req := &proto.SyncRequest{Meta: infoToMetaData(sysInfo)} myPrivateKey := c.key myPublicKey := myPrivateKey.PublicKey() encryptedReq, err := encryption.EncryptMessage(serverPubKey, myPrivateKey, req) if err != nil { log.Errorf("failed encrypting message: %s", err) return nil, err } syncReq := &proto.EncryptedMessage{WgPubKey: myPublicKey.String(), Body: encryptedReq} sync, err := c.realClient.Sync(ctx, syncReq) if err != nil { return nil, err } return sync, nil } func (c *GrpcClient) receiveUpdatesEvents(stream proto.ManagementService_SyncClient, serverPubKey wgtypes.Key, msgHandler func(msg *proto.SyncResponse) error) error { for { update, err := stream.Recv() if err == io.EOF { log.Debugf("Management stream has been closed by server: %s", err) return err } if err != nil { log.Debugf("disconnected from Management Service sync stream: %v", err) return err } log.Debugf("got an update message from Management Service") decryptedResp := &proto.SyncResponse{} err = encryption.DecryptMessage(serverPubKey, c.key, update.Body, decryptedResp) if err != nil { log.Errorf("failed decrypting update message from Management Service: %s", err) return err } if err := msgHandler(decryptedResp); err != nil { log.Errorf("failed handling an update message received from Management Service: %v", err.Error()) } } } // HealthCheck actively probes the management server and returns an error if unreachable. // Used to validate connectivity before committing configuration changes. func (c *GrpcClient) HealthCheck() error { if !c.ready() { return errors.New(errMsgNoMgmtConnection) } _, err := c.getServerPublicKey() return err } // getServerPublicKey fetches the server's WireGuard public key. func (c *GrpcClient) getServerPublicKey() (*wgtypes.Key, error) { mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second) defer cancel() resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{}) if err != nil { return nil, fmt.Errorf("failed getting Management Service public key: %w", err) } serverKey, err := wgtypes.ParseKey(resp.Key) if err != nil { return nil, err } return &serverKey, nil } // IsHealthy returns the current connection status without blocking. // Used by the engine to monitor connectivity in the background. func (c *GrpcClient) IsHealthy() bool { switch c.conn.GetState() { case connectivity.TransientFailure: return false case connectivity.Connecting: return true case connectivity.Shutdown: return true case connectivity.Idle: case connectivity.Ready: } ctx, cancel := context.WithTimeout(c.ctx, 1*time.Second) defer cancel() _, err := c.realClient.GetServerKey(ctx, &proto.Empty{}) if err != nil { c.notifyDisconnected(err) log.Warnf("health check returned: %s", err) return false } c.notifyConnected() return true } func (c *GrpcClient) login(req *proto.LoginRequest) (*proto.LoginResponse, error) { if !c.ready() { return nil, errors.New(errMsgNoMgmtConnection) } serverKey, err := c.getServerPublicKey() if err != nil { return nil, err } loginReq, err := encryption.EncryptMessage(*serverKey, c.key, req) if err != nil { log.Errorf("failed to encrypt message: %s", err) return nil, err } var resp *proto.EncryptedMessage operation := func() error { mgmCtx, cancel := context.WithTimeout(context.Background(), ConnectTimeout) defer cancel() var err error resp, err = c.realClient.Login(mgmCtx, &proto.EncryptedMessage{ WgPubKey: c.key.PublicKey().String(), Body: loginReq, }) if err != nil { // retry only on context canceled if s, ok := gstatus.FromError(err); ok && s.Code() == codes.Canceled { return err } return backoff.Permanent(err) } return nil } err = backoff.Retry(operation, nbgrpc.Backoff(c.ctx)) if err != nil { log.Errorf("failed to login to Management Service: %v", err) return nil, err } loginResp := &proto.LoginResponse{} err = encryption.DecryptMessage(*serverKey, c.key, resp.Body, loginResp) if err != nil { log.Errorf("failed to decrypt login response: %s", err) return nil, err } return loginResp, nil } // Register registers peer on Management Server. It actually calls a Login endpoint with a provided setup key // Takes care of encrypting and decrypting messages. // This method will also collect system info and send it with the request (e.g. hostname, os, etc) func (c *GrpcClient) Register(setupKey string, jwtToken string, sysInfo *system.Info, pubSSHKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error) { keys := &proto.PeerKeys{ SshPubKey: pubSSHKey, WgPubKey: []byte(c.key.PublicKey().String()), } return c.login(&proto.LoginRequest{SetupKey: setupKey, Meta: infoToMetaData(sysInfo), JwtToken: jwtToken, PeerKeys: keys, DnsLabels: dnsLabels.ToPunycodeList()}) } // Login attempts login to Management Server. Takes care of encrypting and decrypting messages. func (c *GrpcClient) Login(sysInfo *system.Info, pubSSHKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error) { keys := &proto.PeerKeys{ SshPubKey: pubSSHKey, WgPubKey: []byte(c.key.PublicKey().String()), } return c.login(&proto.LoginRequest{Meta: infoToMetaData(sysInfo), PeerKeys: keys, DnsLabels: dnsLabels.ToPunycodeList()}) } // GetDeviceAuthorizationFlow returns a device authorization flow information. // It also takes care of encrypting and decrypting messages. func (c *GrpcClient) GetDeviceAuthorizationFlow() (*proto.DeviceAuthorizationFlow, error) { if !c.ready() { return nil, fmt.Errorf("no connection to management in order to get device authorization flow") } serverKey, err := c.getServerPublicKey() if err != nil { return nil, err } mgmCtx, cancel := context.WithTimeout(c.ctx, time.Second*2) defer cancel() message := &proto.DeviceAuthorizationFlowRequest{} encryptedMSG, err := encryption.EncryptMessage(*serverKey, c.key, message) if err != nil { return nil, err } resp, err := c.realClient.GetDeviceAuthorizationFlow(mgmCtx, &proto.EncryptedMessage{ WgPubKey: c.key.PublicKey().String(), Body: encryptedMSG}, ) if err != nil { return nil, err } flowInfoResp := &proto.DeviceAuthorizationFlow{} err = encryption.DecryptMessage(*serverKey, c.key, resp.Body, flowInfoResp) if err != nil { errWithMSG := fmt.Errorf("failed to decrypt device authorization flow message: %s", err) log.Error(errWithMSG) return nil, errWithMSG } return flowInfoResp, nil } // GetPKCEAuthorizationFlow returns a pkce authorization flow information. // It also takes care of encrypting and decrypting messages. func (c *GrpcClient) GetPKCEAuthorizationFlow() (*proto.PKCEAuthorizationFlow, error) { if !c.ready() { return nil, fmt.Errorf("no connection to management in order to get pkce authorization flow") } serverKey, err := c.getServerPublicKey() if err != nil { return nil, err } mgmCtx, cancel := context.WithTimeout(c.ctx, time.Second*2) defer cancel() message := &proto.PKCEAuthorizationFlowRequest{} encryptedMSG, err := encryption.EncryptMessage(*serverKey, c.key, message) if err != nil { return nil, err } resp, err := c.realClient.GetPKCEAuthorizationFlow(mgmCtx, &proto.EncryptedMessage{ WgPubKey: c.key.PublicKey().String(), Body: encryptedMSG, }) if err != nil { return nil, err } flowInfoResp := &proto.PKCEAuthorizationFlow{} err = encryption.DecryptMessage(*serverKey, c.key, resp.Body, flowInfoResp) if err != nil { errWithMSG := fmt.Errorf("failed to decrypt pkce authorization flow message: %s", err) log.Error(errWithMSG) return nil, errWithMSG } return flowInfoResp, nil } // SyncMeta sends updated system metadata to the Management Service. // It should be used if there is changes on peer posture check after initial sync. func (c *GrpcClient) SyncMeta(sysInfo *system.Info) error { if !c.ready() { return errors.New(errMsgNoMgmtConnection) } serverPubKey, err := c.getServerPublicKey() if err != nil { log.Debugf(errMsgMgmtPublicKey, err) return err } syncMetaReq, err := encryption.EncryptMessage(*serverPubKey, c.key, &proto.SyncMetaRequest{Meta: infoToMetaData(sysInfo)}) if err != nil { log.Errorf("failed to encrypt message: %s", err) return err } mgmCtx, cancel := context.WithTimeout(c.ctx, ConnectTimeout) defer cancel() _, err = c.realClient.SyncMeta(mgmCtx, &proto.EncryptedMessage{ WgPubKey: c.key.PublicKey().String(), Body: syncMetaReq, }) return err } func (c *GrpcClient) notifyDisconnected(err error) { c.connStateCallbackLock.RLock() defer c.connStateCallbackLock.RUnlock() if c.connStateCallback == nil { return } c.connStateCallback.MarkManagementDisconnected(err) } func (c *GrpcClient) notifyConnected() { c.connStateCallbackLock.RLock() defer c.connStateCallbackLock.RUnlock() if c.connStateCallback == nil { return } c.connStateCallback.MarkManagementConnected() } func (c *GrpcClient) Logout() error { serverKey, err := c.getServerPublicKey() if err != nil { return fmt.Errorf("get server public key: %w", err) } mgmCtx, cancel := context.WithTimeout(c.ctx, time.Second*15) defer cancel() message := &proto.Empty{} encryptedMSG, err := encryption.EncryptMessage(*serverKey, c.key, message) if err != nil { return fmt.Errorf("encrypt logout message: %w", err) } _, err = c.realClient.Logout(mgmCtx, &proto.EncryptedMessage{ WgPubKey: c.key.PublicKey().String(), Body: encryptedMSG, }) if err != nil { return fmt.Errorf("logout: %w", err) } return nil } // CreateExpose calls the management server to create a new expose service. func (c *GrpcClient) CreateExpose(ctx context.Context, req ExposeRequest) (*ExposeResponse, error) { serverPubKey, err := c.getServerPublicKey() if err != nil { return nil, err } protoReq, err := toProtoExposeServiceRequest(req) if err != nil { return nil, err } encReq, err := encryption.EncryptMessage(*serverPubKey, c.key, protoReq) if err != nil { return nil, fmt.Errorf("encrypt create expose request: %w", err) } mgmCtx, cancel := context.WithTimeout(ctx, ConnectTimeout) defer cancel() resp, err := c.realClient.CreateExpose(mgmCtx, &proto.EncryptedMessage{ WgPubKey: c.key.PublicKey().String(), Body: encReq, }) if err != nil { return nil, err } exposeResp := &proto.ExposeServiceResponse{} if err := encryption.DecryptMessage(*serverPubKey, c.key, resp.Body, exposeResp); err != nil { return nil, fmt.Errorf("decrypt create expose response: %w", err) } return fromProtoExposeResponse(exposeResp), nil } // RenewExpose extends the TTL of an active expose session on the management server. func (c *GrpcClient) RenewExpose(ctx context.Context, domain string) error { serverPubKey, err := c.getServerPublicKey() if err != nil { return err } req := &proto.RenewExposeRequest{Domain: domain} encReq, err := encryption.EncryptMessage(*serverPubKey, c.key, req) if err != nil { return fmt.Errorf("encrypt renew expose request: %w", err) } mgmCtx, cancel := context.WithTimeout(ctx, ConnectTimeout) defer cancel() _, err = c.realClient.RenewExpose(mgmCtx, &proto.EncryptedMessage{ WgPubKey: c.key.PublicKey().String(), Body: encReq, }) return err } // StopExpose terminates an active expose session on the management server. func (c *GrpcClient) StopExpose(ctx context.Context, domain string) error { serverPubKey, err := c.getServerPublicKey() if err != nil { return err } req := &proto.StopExposeRequest{Domain: domain} encReq, err := encryption.EncryptMessage(*serverPubKey, c.key, req) if err != nil { return fmt.Errorf("encrypt stop expose request: %w", err) } mgmCtx, cancel := context.WithTimeout(ctx, ConnectTimeout) defer cancel() _, err = c.realClient.StopExpose(mgmCtx, &proto.EncryptedMessage{ WgPubKey: c.key.PublicKey().String(), Body: encReq, }) return err } func fromProtoExposeResponse(resp *proto.ExposeServiceResponse) *ExposeResponse { return &ExposeResponse{ ServiceName: resp.ServiceName, Domain: resp.Domain, ServiceURL: resp.ServiceUrl, PortAutoAssigned: resp.PortAutoAssigned, } } func toProtoExposeServiceRequest(req ExposeRequest) (*proto.ExposeServiceRequest, error) { var protocol proto.ExposeProtocol switch req.Protocol { case int(proto.ExposeProtocol_EXPOSE_HTTP): protocol = proto.ExposeProtocol_EXPOSE_HTTP case int(proto.ExposeProtocol_EXPOSE_HTTPS): protocol = proto.ExposeProtocol_EXPOSE_HTTPS case int(proto.ExposeProtocol_EXPOSE_TCP): protocol = proto.ExposeProtocol_EXPOSE_TCP case int(proto.ExposeProtocol_EXPOSE_UDP): protocol = proto.ExposeProtocol_EXPOSE_UDP case int(proto.ExposeProtocol_EXPOSE_TLS): protocol = proto.ExposeProtocol_EXPOSE_TLS default: return nil, fmt.Errorf("invalid expose protocol: %d", req.Protocol) } return &proto.ExposeServiceRequest{ NamePrefix: req.NamePrefix, Domain: req.Domain, Port: uint32(req.Port), Protocol: protocol, Pin: req.Pin, Password: req.Password, UserGroups: req.UserGroups, ListenPort: uint32(req.ListenPort), }, nil } func infoToMetaData(info *system.Info) *proto.PeerSystemMeta { if info == nil { return nil } addresses := make([]*proto.NetworkAddress, 0, len(info.NetworkAddresses)) for _, addr := range info.NetworkAddresses { addresses = append(addresses, &proto.NetworkAddress{ NetIP: addr.NetIP.String(), Mac: addr.Mac, }) } files := make([]*proto.File, 0, len(info.Files)) for _, file := range info.Files { files = append(files, &proto.File{ Path: file.Path, Exist: file.Exist, ProcessIsRunning: file.ProcessIsRunning, }) } return &proto.PeerSystemMeta{ Hostname: info.Hostname, GoOS: info.GoOS, OS: info.OS, Core: info.OSVersion, OSVersion: info.OSVersion, Platform: info.Platform, Kernel: info.Kernel, NetbirdVersion: info.NetbirdVersion, UiVersion: info.UIVersion, KernelVersion: info.KernelVersion, NetworkAddresses: addresses, SysSerialNumber: info.SystemSerialNumber, SysManufacturer: info.SystemManufacturer, SysProductName: info.SystemProductName, Environment: &proto.Environment{ Cloud: info.Environment.Cloud, Platform: info.Environment.Platform, }, Files: files, Flags: &proto.Flags{ RosenpassEnabled: info.RosenpassEnabled, RosenpassPermissive: info.RosenpassPermissive, ServerSSHAllowed: info.ServerSSHAllowed, DisableClientRoutes: info.DisableClientRoutes, DisableServerRoutes: info.DisableServerRoutes, DisableDNS: info.DisableDNS, DisableFirewall: info.DisableFirewall, BlockLANAccess: info.BlockLANAccess, BlockInbound: info.BlockInbound, LazyConnectionEnabled: info.LazyConnectionEnabled, }, } }