From a7ddb8f1f8fe2893d90132ee7de038d0adc93b46 Mon Sep 17 00:00:00 2001 From: Viktor Liu <17948409+lixmal@users.noreply.github.com> Date: Tue, 28 Jan 2025 12:25:45 +0100 Subject: [PATCH] [client] Replace engine probes with direct calls (#3195) --- client/cmd/up.go | 2 +- client/internal/connect.go | 17 ++--- client/internal/engine.go | 127 ++++++++++++------------------------- client/internal/probe.go | 58 ----------------- client/server/server.go | 46 ++++++-------- 5 files changed, 66 insertions(+), 184 deletions(-) delete mode 100644 client/internal/probe.go diff --git a/client/cmd/up.go b/client/cmd/up.go index 9f8f738bc84..f7c2bbfe4c6 100644 --- a/client/cmd/up.go +++ b/client/cmd/up.go @@ -190,7 +190,7 @@ func runInForegroundMode(ctx context.Context, cmd *cobra.Command) error { r.GetFullStatus() connectClient := internal.NewConnectClient(ctx, config, r) - return connectClient.Run() + return connectClient.Run(nil) } func runInDaemonMode(ctx context.Context, cmd *cobra.Command) error { diff --git a/client/internal/connect.go b/client/internal/connect.go index a1e8f0f8c04..3e3f04f171e 100644 --- a/client/internal/connect.go +++ b/client/internal/connect.go @@ -59,13 +59,8 @@ func NewConnectClient( } // Run with main logic. -func (c *ConnectClient) Run() error { - return c.run(MobileDependency{}, nil, nil) -} - -// RunWithProbes runs the client's main logic with probes attached -func (c *ConnectClient) RunWithProbes(probes *ProbeHolder, runningChan chan error) error { - return c.run(MobileDependency{}, probes, runningChan) +func (c *ConnectClient) Run(runningChan chan error) error { + return c.run(MobileDependency{}, runningChan) } // RunOnAndroid with main logic on mobile system @@ -84,7 +79,7 @@ func (c *ConnectClient) RunOnAndroid( HostDNSAddresses: dnsAddresses, DnsReadyListener: dnsReadyListener, } - return c.run(mobileDependency, nil, nil) + return c.run(mobileDependency, nil) } func (c *ConnectClient) RunOniOS( @@ -102,10 +97,10 @@ func (c *ConnectClient) RunOniOS( DnsManager: dnsManager, StateFilePath: stateFilePath, } - return c.run(mobileDependency, nil, nil) + return c.run(mobileDependency, nil) } -func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHolder, runningChan chan error) error { +func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan error) error { defer func() { if r := recover(); r != nil { log.Panicf("Panic occurred: %v, stack trace: %s", r, string(debug.Stack())) @@ -261,7 +256,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHold checks := loginResp.GetChecks() c.engineMutex.Lock() - c.engine = NewEngineWithProbes(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, probes, checks) + c.engine = NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks) c.engine.SetNetworkMapPersistence(c.persistNetworkMap) c.engineMutex.Unlock() diff --git a/client/internal/engine.go b/client/internal/engine.go index 43749fbe552..4f69adfa6d7 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -175,8 +175,6 @@ type Engine struct { dnsServer dns.Server - probes *ProbeHolder - // checks are the client-applied posture checks that need to be evaluated on the client checks []*mgmProto.Checks @@ -196,7 +194,7 @@ type Peer struct { WgAllowedIps string } -// NewEngine creates a new Connection Engine +// NewEngine creates a new Connection Engine with probes attached func NewEngine( clientCtx context.Context, clientCancel context.CancelFunc, @@ -207,33 +205,6 @@ func NewEngine( mobileDep MobileDependency, statusRecorder *peer.Status, checks []*mgmProto.Checks, -) *Engine { - return NewEngineWithProbes( - clientCtx, - clientCancel, - signalClient, - mgmClient, - relayManager, - config, - mobileDep, - statusRecorder, - nil, - checks, - ) -} - -// NewEngineWithProbes creates a new Connection Engine with probes attached -func NewEngineWithProbes( - clientCtx context.Context, - clientCancel context.CancelFunc, - signalClient signal.Client, - mgmClient mgm.Client, - relayManager *relayClient.Manager, - config *EngineConfig, - mobileDep MobileDependency, - statusRecorder *peer.Status, - probes *ProbeHolder, - checks []*mgmProto.Checks, ) *Engine { engine := &Engine{ clientCtx: clientCtx, @@ -251,7 +222,6 @@ func NewEngineWithProbes( networkSerial: 0, sshServerFunc: nbssh.DefaultSSHServer, statusRecorder: statusRecorder, - probes: probes, checks: checks, connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit), } @@ -450,7 +420,6 @@ func (e *Engine) Start() error { e.receiveSignalEvents() e.receiveManagementEvents() - e.receiveProbeEvents() // starting network monitor at the very last to avoid disruptions e.startNetworkMonitor() @@ -1513,72 +1482,58 @@ func (e *Engine) getRosenpassAddr() string { return "" } -func (e *Engine) receiveProbeEvents() { - if e.probes == nil { - return - } - if e.probes.SignalProbe != nil { - go e.probes.SignalProbe.Receive(e.ctx, func() bool { - healthy := e.signal.IsHealthy() - log.Debugf("received signal probe request, healthy: %t", healthy) - return healthy - }) - } +// RunHealthProbes executes health checks for Signal, Management, Relay and WireGuard services +// and updates the status recorder with the latest states. +func (e *Engine) RunHealthProbes() bool { + signalHealthy := e.signal.IsHealthy() + log.Debugf("signal health check: healthy=%t", signalHealthy) - if e.probes.MgmProbe != nil { - go e.probes.MgmProbe.Receive(e.ctx, func() bool { - healthy := e.mgmClient.IsHealthy() - log.Debugf("received management probe request, healthy: %t", healthy) - return healthy - }) - } + managementHealthy := e.mgmClient.IsHealthy() + log.Debugf("management health check: healthy=%t", managementHealthy) - if e.probes.RelayProbe != nil { - go e.probes.RelayProbe.Receive(e.ctx, func() bool { - healthy := true + results := append(e.probeSTUNs(), e.probeTURNs()...) + e.statusRecorder.UpdateRelayStates(results) - results := append(e.probeSTUNs(), e.probeTURNs()...) - e.statusRecorder.UpdateRelayStates(results) - - // A single failed server will result in a "failed" probe - for _, res := range results { - if res.Err != nil { - healthy = false - break - } - } - - log.Debugf("received relay probe request, healthy: %t", healthy) - return healthy - }) + relayHealthy := true + for _, res := range results { + if res.Err != nil { + relayHealthy = false + break + } } + log.Debugf("relay health check: healthy=%t", relayHealthy) - if e.probes.WgProbe != nil { - go e.probes.WgProbe.Receive(e.ctx, func() bool { - log.Debug("received wg probe request") - - for _, key := range e.peerStore.PeersPubKey() { - wgStats, err := e.wgInterface.GetStats(key) - if err != nil { - log.Debugf("failed to get wg stats for peer %s: %s", key, err) - } - // wgStats could be zero value, in which case we just reset the stats - if err := e.statusRecorder.UpdateWireGuardPeerState(key, wgStats); err != nil { - log.Debugf("failed to update wg stats for peer %s: %s", key, err) - } - } - - return true - }) + for _, key := range e.peerStore.PeersPubKey() { + wgStats, err := e.wgInterface.GetStats(key) + if err != nil { + log.Debugf("failed to get wg stats for peer %s: %s", key, err) + continue + } + // wgStats could be zero value, in which case we just reset the stats + if err := e.statusRecorder.UpdateWireGuardPeerState(key, wgStats); err != nil { + log.Debugf("failed to update wg stats for peer %s: %s", key, err) + } } + + allHealthy := signalHealthy && managementHealthy && relayHealthy + log.Debugf("all health checks completed: healthy=%t", allHealthy) + return allHealthy } func (e *Engine) probeSTUNs() []relay.ProbeResult { - return relay.ProbeAll(e.ctx, relay.ProbeSTUN, e.STUNs) + e.syncMsgMux.Lock() + stuns := slices.Clone(e.STUNs) + e.syncMsgMux.Unlock() + + return relay.ProbeAll(e.ctx, relay.ProbeSTUN, stuns) } func (e *Engine) probeTURNs() []relay.ProbeResult { - return relay.ProbeAll(e.ctx, relay.ProbeTURN, e.TURNs) + e.syncMsgMux.Lock() + turns := slices.Clone(e.TURNs) + e.syncMsgMux.Unlock() + + return relay.ProbeAll(e.ctx, relay.ProbeTURN, turns) } func (e *Engine) restartEngine() { diff --git a/client/internal/probe.go b/client/internal/probe.go deleted file mode 100644 index 23290cf74a4..00000000000 --- a/client/internal/probe.go +++ /dev/null @@ -1,58 +0,0 @@ -package internal - -import "context" - -type ProbeHolder struct { - MgmProbe *Probe - SignalProbe *Probe - RelayProbe *Probe - WgProbe *Probe -} - -// Probe allows to run on-demand callbacks from different code locations. -// Pass the probe to a receiving and a sending end. The receiving end starts listening -// to requests with Receive and executes a callback when the sending end requests it -// by calling Probe. -type Probe struct { - request chan struct{} - result chan bool - ready bool -} - -// NewProbe returns a new initialized probe. -func NewProbe() *Probe { - return &Probe{ - request: make(chan struct{}), - result: make(chan bool), - } -} - -// Probe requests the callback to be run and returns a bool indicating success. -// It always returns true as long as the receiver is not ready. -func (p *Probe) Probe() bool { - if !p.ready { - return true - } - - p.request <- struct{}{} - return <-p.result -} - -// Receive starts listening for probe requests. On such a request it runs the supplied -// callback func which must return a bool indicating success. -// Blocks until the passed context is cancelled. -func (p *Probe) Receive(ctx context.Context, callback func() bool) { - p.ready = true - defer func() { - p.ready = false - }() - - for { - select { - case <-ctx.Done(): - return - case <-p.request: - p.result <- callback() - } - } -} diff --git a/client/server/server.go b/client/server/server.go index 638ede386c9..42420d1c127 100644 --- a/client/server/server.go +++ b/client/server/server.go @@ -63,12 +63,7 @@ type Server struct { statusRecorder *peer.Status sessionWatcher *internal.SessionWatcher - mgmProbe *internal.Probe - signalProbe *internal.Probe - relayProbe *internal.Probe - wgProbe *internal.Probe - lastProbe time.Time - + lastProbe time.Time persistNetworkMap bool } @@ -86,12 +81,7 @@ func New(ctx context.Context, configPath, logFile string) *Server { latestConfigInput: internal.ConfigInput{ ConfigPath: configPath, }, - logFile: logFile, - mgmProbe: internal.NewProbe(), - signalProbe: internal.NewProbe(), - relayProbe: internal.NewProbe(), - wgProbe: internal.NewProbe(), - + logFile: logFile, persistNetworkMap: true, } } @@ -202,14 +192,7 @@ func (s *Server) connectWithRetryRuns(ctx context.Context, config *internal.Conf s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder) s.connectClient.SetNetworkMapPersistence(s.persistNetworkMap) - probes := internal.ProbeHolder{ - MgmProbe: s.mgmProbe, - SignalProbe: s.signalProbe, - RelayProbe: s.relayProbe, - WgProbe: s.wgProbe, - } - - err := s.connectClient.RunWithProbes(&probes, runningChan) + err := s.connectClient.Run(runningChan) if err != nil { log.Debugf("run client connection exited with error: %v. Will retry in the background", err) } @@ -676,9 +659,13 @@ func (s *Server) Down(ctx context.Context, _ *proto.DownRequest) (*proto.DownRes // Status returns the daemon status func (s *Server) Status( - _ context.Context, + ctx context.Context, msg *proto.StatusRequest, ) (*proto.StatusResponse, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + s.mutex.Lock() defer s.mutex.Unlock() @@ -707,14 +694,17 @@ func (s *Server) Status( } func (s *Server) runProbes() { - if time.Since(s.lastProbe) > probeThreshold { - managementHealthy := s.mgmProbe.Probe() - signalHealthy := s.signalProbe.Probe() - relayHealthy := s.relayProbe.Probe() - wgProbe := s.wgProbe.Probe() + if s.connectClient == nil { + return + } - // Update last time only if all probes were successful - if managementHealthy && signalHealthy && relayHealthy && wgProbe { + engine := s.connectClient.Engine() + if engine == nil { + return + } + + if time.Since(s.lastProbe) > probeThreshold { + if engine.RunHealthProbes() { s.lastProbe = time.Now() } }