From d539a0ccbae1946faa2ad73c0c5dee9b4c1a234c Mon Sep 17 00:00:00 2001 From: Paul Noel Date: Wed, 31 Jan 2024 10:31:35 -0600 Subject: [PATCH] fly/bootstrapMonitor: rename files --- fly/cmd/bootstrap_monitor/bsMon.go | 204 ------------------------- fly/cmd/bootstrap_monitor/main.go | 236 +++++++++++++++++++++++++++++ 2 files changed, 236 insertions(+), 204 deletions(-) delete mode 100644 fly/cmd/bootstrap_monitor/bsMon.go create mode 100644 fly/cmd/bootstrap_monitor/main.go diff --git a/fly/cmd/bootstrap_monitor/bsMon.go b/fly/cmd/bootstrap_monitor/bsMon.go deleted file mode 100644 index 4fa93d27..00000000 --- a/fly/cmd/bootstrap_monitor/bsMon.go +++ /dev/null @@ -1,204 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "os" - "strconv" - "time" - - "github.com/certusone/wormhole/node/pkg/common" - "github.com/certusone/wormhole/node/pkg/p2p" - gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" - "github.com/certusone/wormhole/node/pkg/supervisor" - ipfslog "github.com/ipfs/go-log/v2" - "github.com/joho/godotenv" - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/wormhole-foundation/wormhole-monitor/fly/utils" - "go.uber.org/zap" -) - -var ( - rootCtx context.Context - rootCtxCancel context.CancelFunc - hbReceived bool - // The following are from the .env file: - p2pNetworkID string - p2pPort uint - nodeKeyPath string - ethRpcUrl string - coreBridgeAddr string - logLevel string -) - -func loadEnvVars() { - err := godotenv.Load() // By default loads .env - if err != nil { - log.Fatal("Error loading .env file") - } - p2pNetworkID = verifyEnvVar("P2P_NETWORK_ID") - port, err := strconv.ParseUint(verifyEnvVar("P2P_PORT"), 10, 32) - if err != nil { - log.Fatal("Error parsing P2P_PORT") - } - p2pPort = uint(port) - nodeKeyPath = verifyEnvVar("NODE_KEY_PATH") - logLevel = verifyEnvVar("LOG_LEVEL") - ethRpcUrl = verifyEnvVar("ETH_RPC_URL") - coreBridgeAddr = verifyEnvVar("CORE_BRIDGE_ADDR") -} - -func verifyEnvVar(key string) string { - value := os.Getenv(key) - if value == "" { - log.Fatalf("%s must be specified", key) - } - return value -} - -func main() { - loadEnvVars() - p2pBootstraps := []string{"/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC", - "/dns4/wormhole.mcf.rocks/udp/8999/quic/p2p/12D3KooWDZVv7BhZ8yFLkarNdaSWaB43D6UbQwExJ8nnGAEmfHcU", - "/dns4/wormhole-v2-mainnet-bootstrap.staking.fund/udp/8999/quic/p2p/12D3KooWG8obDX9DNi1KUwZNu9xkGwfKqTp2GFwuuHpWZ3nQruS1"} - - level, err := ipfslog.LevelFromString("info") - if err != nil { - fmt.Println("Invalid log level") - os.Exit(1) - } - - logger := ipfslog.Logger("bootstrap-monitor").Desugar() - - ipfslog.SetAllLoggers(level) - - // Load p2p private key - var priv crypto.PrivKey - priv, err = common.GetOrCreateNodeKey(logger, nodeKeyPath) - if err != nil { - logger.Fatal("Failed to load node key", zap.Error(err)) - } - - // Node's main lifecycle context. - rootCtx, rootCtxCancel = context.WithCancel(context.Background()) - defer rootCtxCancel() - - // Heartbeat updates - heartbeatC := make(chan *gossipv1.Heartbeat, 50) - - // Guardian set state managed by processor - gst := common.NewGuardianSetState(heartbeatC) - - // Inbound observations - obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024) - - // Inbound observation requests - obsvReqC := make(chan *gossipv1.ObservationRequest, 50) - - // Inbound signed VAAs - signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50) - - // Governor cfg - govConfigC := make(chan *gossipv1.SignedChainGovernorConfig, 50) - - // Governor status - govStatusC := make(chan *gossipv1.SignedChainGovernorStatus, 50) - - // Bootstrap guardian set, otherwise heartbeats would be skipped - idx, sgs, err := utils.FetchCurrentGuardianSet(ethRpcUrl, coreBridgeAddr) - if err != nil { - logger.Fatal("Failed to fetch guardian set", zap.Error(err)) - } - logger.Info("guardian set", zap.Uint32("index", idx), zap.Any("gs", sgs)) - gs := common.GuardianSet{ - Keys: sgs.Keys, - Index: idx, - } - gst.Set(&gs) - - for _, bootstrap := range p2pBootstraps { - localContext, localCancel := context.WithCancel(rootCtx) - go func() { - for { - select { - case <-localContext.Done(): - return - case hb := <-heartbeatC: - id := hb.NodeName - now := time.Now() - logger.Info("heartbeat", zap.String("id", id), zap.Time("time", now)) - hbReceived = true - case <-obsvC: - case <-obsvReqC: - case <-signedInC: - case <-govConfigC: - case <-govStatusC: - } - } - }() - logger.Info("Starting p2p", zap.String("bootstrap", bootstrap)) - hbReceived = false - bootstrapPeer := bootstrap - components := p2p.DefaultComponents() - components.Port = p2pPort - supervisor.New(localContext, logger, func(ctx context.Context) error { - if err := supervisor.Run(ctx, - "p2p", - p2p.Run(obsvC, - obsvReqC, - nil, - nil, - signedInC, - priv, - nil, - gst, - p2pNetworkID, - bootstrapPeer, - "", - false, - localCancel, - nil, - nil, - govConfigC, - govStatusC, - components, - nil, - false, - false, - nil, - nil, - "", - 0, - "", - )); err != nil { - return err - } - - logger.Info("Started internal services") - - <-ctx.Done() - return nil - }, - // It's safer to crash and restart the process in case we encounter a panic, - // rather than attempting to reschedule the runnable. - supervisor.WithPropagatePanic) - // Max time to wait for a heartbeat - time.Sleep(15 * time.Second) - if hbReceived { - logger.Info("Heartbeat received for", zap.String("bootstrap peer", bootstrapPeer)) - } else { - logger.Info("No heartbeat received for", zap.String("bootstrap peer", bootstrapPeer)) - } - localCancel() - logger.Info("local context cancelled") - // This is the udp port timeout - time.Sleep(40 * time.Second) - } - - logger.Info("All bootstrap peers exhausted, exiting...") - <-rootCtx.Done() - logger.Info("root context cancelled, exiting...") - // TODO: wait for things to shut down gracefully - -} diff --git a/fly/cmd/bootstrap_monitor/main.go b/fly/cmd/bootstrap_monitor/main.go new file mode 100644 index 00000000..57d6482a --- /dev/null +++ b/fly/cmd/bootstrap_monitor/main.go @@ -0,0 +1,236 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "strconv" + "time" + + "github.com/certusone/wormhole/node/pkg/common" + "github.com/certusone/wormhole/node/pkg/p2p" + gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" + promremotew "github.com/certusone/wormhole/node/pkg/telemetry/prom_remote_write" + ipfslog "github.com/ipfs/go-log/v2" + "github.com/joho/godotenv" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" +) + +var ( + rootCtx context.Context + rootCtxCancel context.CancelFunc + hbReceived bool + // The following are from the .env file: + p2pNetworkID string + p2pPort uint + nodeKeyPath string + ethRpcUrl string + coreBridgeAddr string + logLevel string + promRemoteURL string +) + +var ( + bootstrapPeerStatus = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bootstrap_peer_status", + Help: "Bootstrap peer status (1 = received heartbeat, 0 = no heartbeat)", + }, []string{"bootstrap_peer"}) +) + +func loadEnvVars() { + err := godotenv.Load() // By default loads .env + if err != nil { + log.Fatal("Error loading .env file") + } + p2pNetworkID = verifyEnvVar("P2P_NETWORK_ID") + port, err := strconv.ParseUint(verifyEnvVar("P2P_PORT"), 10, 32) + if err != nil { + log.Fatal("Error parsing P2P_PORT") + } + p2pPort = uint(port) + nodeKeyPath = verifyEnvVar("NODE_KEY_PATH") + logLevel = verifyEnvVar("LOG_LEVEL") + ethRpcUrl = verifyEnvVar("ETH_RPC_URL") + coreBridgeAddr = verifyEnvVar("CORE_BRIDGE_ADDR") + promRemoteURL = verifyEnvVar("PROM_REMOTE_URL") +} + +func verifyEnvVar(key string) string { + value := os.Getenv(key) + if value == "" { + log.Fatalf("%s must be specified", key) + } + return value +} + +func RunPrometheusScraper(ctx context.Context, logger *zap.Logger, info promremotew.PromTelemetryInfo) error { + promLogger := logger.With(zap.String("component", "prometheus_scraper")) + errC := make(chan error) + common.StartRunnable(ctx, errC, false, "prometheus_scraper", func(ctx context.Context) error { + t := time.NewTicker(15 * time.Second) + + for { + select { + case <-ctx.Done(): + return nil + case <-t.C: + err := promremotew.ScrapeAndSendLocalMetrics(ctx, info, promLogger) + if err != nil { + promLogger.Error("ScrapeAndSendLocalMetrics error", zap.Error(err)) + return err + } + } + } + }) + return nil +} + +func main() { + loadEnvVars() + p2pBootstraps := []string{"/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC", + "/dns4/wormhole.mcf.rocks/udp/8999/quic/p2p/12D3KooWDZVv7BhZ8yFLkarNdaSWaB43D6UbQwExJ8nnGAEmfHcU", + "/dns4/wormhole-v2-mainnet-bootstrap.staking.fund/udp/8999/quic/p2p/12D3KooWG8obDX9DNi1KUwZNu9xkGwfKqTp2GFwuuHpWZ3nQruS1"} + + level, err := ipfslog.LevelFromString("info") + if err != nil { + fmt.Println("Invalid log level") + os.Exit(1) + } + + logger := ipfslog.Logger("bootstrap-monitor").Desugar() + + ipfslog.SetAllLoggers(level) + + // Load p2p private key + var priv crypto.PrivKey + priv, err = common.GetOrCreateNodeKey(logger, nodeKeyPath) + if err != nil { + logger.Fatal("Failed to load node key", zap.Error(err)) + } + + // Main lifecycle context. + rootCtx, rootCtxCancel = context.WithCancel(context.Background()) + defer rootCtxCancel() + + // Prometheus stuff + // Start the Prometheus scraper + usingPromRemoteWrite := promRemoteURL != "" + if usingPromRemoteWrite { + var info promremotew.PromTelemetryInfo + info.PromRemoteURL = promRemoteURL + info.Labels = map[string]string{ + "network": p2pNetworkID, + "product": "bootstrap_monitor", + } + + err := RunPrometheusScraper(rootCtx, logger, info) + if err != nil { + logger.Fatal("Failed to start prometheus scraper", zap.Error(err)) + } + } + // End Prometheus stuff + + // This starts an infinite loop that will run the p2p heartbeat checks every 15 minutes + for { + for _, bootstrap := range p2pBootstraps { + localContext, localCancel := context.WithCancel(rootCtx) + defer localCancel() + logger.Info("Starting p2p", zap.String("bootstrap", bootstrap)) + hbReceived = false + bootstrapPeer := bootstrap + components := p2p.DefaultComponents() + components.Port = p2pPort + + host, err := p2p.NewHost(logger, localContext, p2pNetworkID, bootstrapPeer, components, priv) + if err != nil { + panic(err) + } + + ps, err := pubsub.NewGossipSub(localContext, host) + if err != nil { + panic(err) + } + + topic := fmt.Sprintf("%s/%s", p2pNetworkID, "broadcast") + topicHandle, err := ps.Join(topic) + if err != nil { + logger.Panic("failed to join broadcast topic", zap.String("topic", topic), zap.Error(err)) + } + sub, err := topicHandle.Subscribe() + if err != nil { + logger.Panic("failed to subscribe to broadcast topic", zap.Error(err)) + } + + go func() { + for { + envelope, err := sub.Next(localContext) + if err != nil { + logger.Error("failed to receive pubsub message", zap.Error(err)) + break + } + var msg gossipv1.GossipMessage + err = proto.Unmarshal(envelope.Data, &msg) + if err != nil { + logger.Info("received invalid message", + zap.Binary("data", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + continue + } + // Only look at heartbeats + if msg.GetSignedHeartbeat() == nil { + continue + } + logger.Info("received Heartbeat") + hbReceived = true + break + } + // Start shutdown + logger.Info("Shutting down...") + sub.Cancel() + if err := topicHandle.Close(); err != nil { + logger.Error("Error closing the broadcast topic", zap.Error(err)) + } + if err := host.Close(); err != nil { + logger.Error("Error closing the host", zap.Error(err)) + } + // End shutdown + }() + + // Max time to wait for a heartbeat is 15 seconds + for i := 0; i < 15; i++ { + time.Sleep(1 * time.Second) + if hbReceived { + break + } + } + + if hbReceived { + logger.Info("Heartbeat received for", zap.String("bootstrap peer", bootstrapPeer)) + bootstrapPeerStatus.WithLabelValues(bootstrapPeer).Set(1) + } else { + logger.Info("******** ALERT ********** No heartbeat received for", zap.String("bootstrap peer", bootstrapPeer)) + bootstrapPeerStatus.WithLabelValues(bootstrapPeer).Set(0) + // TODO: Do alarm stuff here + } + + // Cancel local context to break out of sub.Next() + localCancel() + logger.Info("local context cancelled") + + // This is the udp port timeout + time.Sleep(40 * time.Second) + } + logger.Info("Sleeping for 15 minutes") + time.Sleep(15 * time.Minute) + } + + rootCtxCancel() + logger.Info("root context cancelled, exiting...") +}