From bef3b3392b623113269d968ebfaa1a550b03c7cb Mon Sep 17 00:00:00 2001 From: Mikhail Bragin Date: Sun, 17 Oct 2021 22:15:38 +0200 Subject: [PATCH] fix: graceful shutdown (#134) * fix: graceful shutdown * fix: windows graceful shutdown --- client/cmd/root.go | 4 +++- client/cmd/service_controller.go | 14 +++++++++++- client/cmd/up.go | 18 ++++++---------- client/internal/connection.go | 17 ++++++++------- client/internal/engine.go | 37 ++++++++++++++++++++++++++------ client/internal/wgproxy.go | 4 ++-- management/client/client.go | 14 ++++++------ signal/client/client.go | 14 ++++++------ 8 files changed, 79 insertions(+), 43 deletions(-) diff --git a/client/cmd/root.go b/client/cmd/root.go index 80dd56eda..2c8368cb4 100644 --- a/client/cmd/root.go +++ b/client/cmd/root.go @@ -31,7 +31,8 @@ var ( } // Execution control channel for stopCh signal - stopCh chan int + stopCh chan int + cleanupCh chan struct{} ) // Execute executes the root command. @@ -41,6 +42,7 @@ func Execute() error { func init() { stopCh = make(chan int) + cleanupCh = make(chan struct{}) defaultConfigPath = "/etc/wiretrustee/config.json" defaultLogFile = "/var/log/wiretrustee/client.log" diff --git a/client/cmd/service_controller.go b/client/cmd/service_controller.go index 6d80f049b..816325907 100644 --- a/client/cmd/service_controller.go +++ b/client/cmd/service_controller.go @@ -5,6 +5,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/wiretrustee/wiretrustee/util" + "time" ) func (p *program) Start(s service.Service) error { @@ -15,12 +16,19 @@ func (p *program) Start(s service.Service) error { if err != nil { return } - }() return nil } func (p *program) Stop(s service.Service) error { + stopCh <- 1 + + select { + case <-cleanupCh: + case <-time.After(time.Second * 10): + log.Warnf("failed waiting for service cleanup, terminating") + } + log.Info("stopped Wiretrustee service") //nolint return nil } @@ -29,11 +37,15 @@ var ( Use: "run", Short: "runs wiretrustee as service", Run: func(cmd *cobra.Command, args []string) { + err := util.InitLog(logLevel, logFile) if err != nil { log.Errorf("failed initializing log %v", err) return } + + SetupCloseHandler() + prg := &program{ cmd: cmd, args: args, diff --git a/client/cmd/up.go b/client/cmd/up.go index f869ba841..6392ee955 100644 --- a/client/cmd/up.go +++ b/client/cmd/up.go @@ -9,7 +9,6 @@ import ( mgm "github.com/wiretrustee/wiretrustee/management/client" mgmProto "github.com/wiretrustee/wiretrustee/management/proto" signal "github.com/wiretrustee/wiretrustee/signal/client" - "github.com/wiretrustee/wiretrustee/util" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -20,13 +19,8 @@ var ( Use: "up", Short: "install, login and start wiretrustee client", RunE: func(cmd *cobra.Command, args []string) error { - err := util.InitLog(logLevel, logFile) - if err != nil { - log.Errorf("failed initializing log %v", err) - return err - } - err = loginCmd.RunE(cmd, args) + err := loginCmd.RunE(cmd, args) if err != nil { return err } @@ -117,7 +111,7 @@ func connectToManagement(ctx context.Context, managementAddr string, ourPrivateK } } - log.Infof("peer logged in to Management Service %s", managementAddr) + log.Debugf("peer logged in to Management Service %s", managementAddr) return client, loginResp, nil } @@ -166,7 +160,7 @@ func runClient() error { } // create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers. - engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel) + engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel, ctx) err = engine.Start() if err != nil { log.Errorf("error while starting Wiretrustee Connection Engine: %s", err) @@ -175,14 +169,11 @@ func runClient() error { log.Print("Wiretrustee engine started, my IP is: ", peerConfig.Address) - SetupCloseHandler() - select { case <-stopCh: case <-ctx.Done(): } - log.Info("shutting down Wiretrustee client") err = mgmClient.Close() if err != nil { log.Errorf("failed closing Management Service client %v", err) @@ -200,5 +191,8 @@ func runClient() error { return err } + log.Info("stopped Wiretrustee client") + cleanupCh <- struct{}{} + return nil } diff --git a/client/internal/connection.go b/client/internal/connection.go index 62e5df167..3a0f90fcf 100644 --- a/client/internal/connection.go +++ b/client/internal/connection.go @@ -166,7 +166,7 @@ func (conn *Connection) Open(timeout time.Duration) error { select { case remoteAuth := <-conn.remoteAuthChannel: - log.Infof("got a connection confirmation from peer %s", conn.Config.RemoteWgKey.String()) + log.Debugf("got a connection confirmation from peer %s", conn.Config.RemoteWgKey.String()) err = conn.agent.GatherCandidates() if err != nil { @@ -186,8 +186,11 @@ func (conn *Connection) Open(timeout time.Duration) error { if err != nil { return err } + + useProxy := useProxy(pair) + // in case the remote peer is in the local network or one of the peers has public static IP -> no need for a Wireguard proxy, direct communication is possible. - if !useProxy(pair) { + if !useProxy { log.Debugf("it is possible to establish a direct connection (without proxy) to peer %s - my addr: %s, remote addr: %s", conn.Config.RemoteWgKey.String(), pair.Local, pair.Remote) err = conn.wgProxy.StartLocal(fmt.Sprintf("%s:%d", pair.Remote.Address(), iface.WgPort)) if err != nil { @@ -195,19 +198,17 @@ func (conn *Connection) Open(timeout time.Duration) error { } } else { - log.Infof("establishing secure tunnel to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair) + log.Debugf("establishing secure tunnel to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair) err = conn.wgProxy.Start(remoteConn) if err != nil { return err } } - if pair.Remote.Type() == ice.CandidateTypeRelay || pair.Local.Type() == ice.CandidateTypeRelay { - log.Infof("using relay with peer %s", conn.Config.RemoteWgKey) - } + relayed := pair.Remote.Type() == ice.CandidateTypeRelay || pair.Local.Type() == ice.CandidateTypeRelay conn.Status = StatusConnected - log.Infof("opened connection to peer %s", conn.Config.RemoteWgKey.String()) + log.Infof("opened connection to peer %s [localProxy=%v, relayed=%v]", conn.Config.RemoteWgKey.String(), useProxy, relayed) case <-conn.closeCond.C: conn.Status = StatusDisconnected return fmt.Errorf("connection to peer %s has been closed", conn.Config.RemoteWgKey.String()) @@ -271,7 +272,7 @@ func (conn *Connection) Close() error { var err error conn.closeCond.Do(func() { - log.Warnf("closing connection to peer %s", conn.Config.RemoteWgKey.String()) + log.Debugf("closing connection to peer %s", conn.Config.RemoteWgKey.String()) if a := conn.agent; a != nil { e := a.Close() diff --git a/client/internal/engine.go b/client/internal/engine.go index d166836b6..f3f1e6b0c 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -57,6 +57,8 @@ type Engine struct { TURNs []*ice.URL cancel context.CancelFunc + + ctx context.Context } // Peer is an instance of the Connection Peer @@ -66,7 +68,7 @@ type Peer struct { } // NewEngine creates a new Connection Engine -func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *EngineConfig, cancel context.CancelFunc) *Engine { +func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *EngineConfig, cancel context.CancelFunc, ctx context.Context) *Engine { return &Engine{ signal: signalClient, mgmClient: mgmClient, @@ -77,17 +79,25 @@ func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *Engin STUNs: []*ice.URL{}, TURNs: []*ice.URL{}, cancel: cancel, + ctx: ctx, } } func (e *Engine) Stop() error { + err := e.removeAllPeerConnections() + if err != nil { + return err + } + log.Debugf("removing Wiretrustee interface %s", e.config.WgIface) - err := iface.Close() + err = iface.Close() if err != nil { log.Errorf("failed closing Wiretrustee interface %s %v", e.config.WgIface, err) return err } + log.Infof("stopped Wiretrustee Engine") + return nil } @@ -127,7 +137,7 @@ func (e *Engine) Start() error { // initializePeer peer agent attempt to open connection func (e *Engine) initializePeer(peer Peer) { - var backOff = &backoff.ExponentialBackOff{ + var backOff = backoff.WithContext(&backoff.ExponentialBackOff{ InitialInterval: backoff.DefaultInitialInterval, RandomizationFactor: backoff.DefaultRandomizationFactor, Multiplier: backoff.DefaultMultiplier, @@ -135,13 +145,14 @@ func (e *Engine) initializePeer(peer Peer) { MaxElapsedTime: time.Duration(0), //never stop Stop: backoff.Stop, Clock: backoff.SystemClock, - } + }, e.ctx) + operation := func() error { _, err := e.openPeerConnection(e.wgPort, e.config.WgPrivateKey, peer) e.peerMux.Lock() defer e.peerMux.Unlock() if _, ok := e.conns[peer.WgPubKey]; !ok { - log.Infof("removing connection attempt with Peer: %v, not retrying", peer.WgPubKey) + log.Debugf("removed connection attempt to peer: %v, not retrying", peer.WgPubKey) return nil } @@ -172,6 +183,19 @@ func (e *Engine) removePeerConnections(peers []string) error { return nil } +func (e *Engine) removeAllPeerConnections() error { + log.Debugf("removing all peer connections") + e.peerMux.Lock() + defer e.peerMux.Unlock() + for peer := range e.conns { + err := e.removePeerConnection(peer) + if err != nil { + return err + } + } + return nil +} + // removePeerConnection closes existing peer connection and removes peer func (e *Engine) removePeerConnection(peerKey string) error { conn, exists := e.conns[peerKey] @@ -179,6 +203,7 @@ func (e *Engine) removePeerConnection(peerKey string) error { delete(e.conns, peerKey) return conn.Close() } + log.Infof("removed connection to peer %s", peerKey) return nil } @@ -310,7 +335,7 @@ func (e *Engine) receiveManagementEvents() { e.cancel() return } - log.Infof("connected to Management Service updates stream") + log.Debugf("stopped receiving updates from Management Service") }() log.Debugf("connecting to Management Service updates stream") } diff --git a/client/internal/wgproxy.go b/client/internal/wgproxy.go index 0c5f48148..9b2e7e487 100644 --- a/client/internal/wgproxy.go +++ b/client/internal/wgproxy.go @@ -87,7 +87,7 @@ func (p *WgProxy) proxyToRemotePeer(remoteConn *ice.Conn) { for { select { case <-p.close: - log.Infof("stopped proxying from remote peer %s due to closed connection", p.remoteKey) + log.Debugf("stopped proxying from remote peer %s due to closed connection", p.remoteKey) return default: n, err := p.wgConn.Read(buf) @@ -113,7 +113,7 @@ func (p *WgProxy) proxyToLocalWireguard(remoteConn *ice.Conn) { for { select { case <-p.close: - log.Infof("stopped proxying from remote peer %s due to closed connection", p.remoteKey) + log.Debugf("stopped proxying from remote peer %s due to closed connection", p.remoteKey) return default: n, err := remoteConn.Read(buf) diff --git a/management/client/client.go b/management/client/client.go index d011a6af6..6d1c96f9e 100644 --- a/management/client/client.go +++ b/management/client/client.go @@ -65,8 +65,8 @@ func (c *Client) Close() error { } //defaultBackoff is a basic backoff mechanism for general issues -func defaultBackoff() backoff.BackOff { - return &backoff.ExponentialBackOff{ +func defaultBackoff(ctx context.Context) backoff.BackOff { + return backoff.WithContext(&backoff.ExponentialBackOff{ InitialInterval: 800 * time.Millisecond, RandomizationFactor: backoff.DefaultRandomizationFactor, Multiplier: backoff.DefaultMultiplier, @@ -74,14 +74,14 @@ func defaultBackoff() backoff.BackOff { MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying Stop: backoff.Stop, Clock: backoff.SystemClock, - } + }, ctx) } // Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages // Blocking request. The result will be sent via msgHandler callback function func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error { - var backOff = defaultBackoff() + var backOff = defaultBackoff(c.ctx) operation := func() error { @@ -114,7 +114,7 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error { err := backoff.Retry(operation, backOff) if err != nil { - log.Errorf("exiting Management Service connection retry loop due to unrecoverable error %s ", err) + log.Warnf("exiting Management Service connection retry loop due to unrecoverable error: %s", err) return err } @@ -145,7 +145,7 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server return err } if err != nil { - log.Errorf("disconnected from Management Service sync stream: %v", err) + log.Warnf("disconnected from Management Service sync stream: %v", err) return err } @@ -159,7 +159,7 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server err = msgHandler(decryptedResp) if err != nil { - log.Errorf("failed handling an update message received from Management Service %v", err.Error()) + log.Errorf("failed handling an update message received from Management Service: %v", err.Error()) return err } } diff --git a/signal/client/client.go b/signal/client/client.go index a603be3a8..c8b284c2f 100644 --- a/signal/client/client.go +++ b/signal/client/client.go @@ -76,8 +76,8 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo } //defaultBackoff is a basic backoff mechanism for general issues -func defaultBackoff() backoff.BackOff { - return &backoff.ExponentialBackOff{ +func defaultBackoff(ctx context.Context) backoff.BackOff { + return backoff.WithContext(&backoff.ExponentialBackOff{ InitialInterval: 800 * time.Millisecond, RandomizationFactor: backoff.DefaultRandomizationFactor, Multiplier: backoff.DefaultMultiplier, @@ -85,7 +85,8 @@ func defaultBackoff() backoff.BackOff { MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying Stop: backoff.Stop, Clock: backoff.SystemClock, - } + }, ctx) + } // Receive Connects to the Signal Exchange message stream and starts receiving messages. @@ -96,12 +97,13 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) { c.connWg.Add(1) go func() { - var backOff = defaultBackoff() + var backOff = defaultBackoff(c.ctx) operation := func() error { + err := c.connect(c.key.PublicKey().String(), msgHandler) if err != nil { - log.Warnf("disconnected from the Signal Exchange due to an error %s. Retrying ... ", err) + log.Warnf("disconnected from the Signal Exchange due to an error: %v", err) c.connWg.Add(1) return err } @@ -112,7 +114,7 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) { err := backoff.Retry(operation, backOff) if err != nil { - log.Errorf("error while communicating with the Signal Exchange %s ", err) + log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err) return } }()