From 1f5b2d7173d72f4e5e67e6ebfa85b0d043faa8af Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Fri, 24 Jan 2025 11:58:48 -0800 Subject: [PATCH] feat(metrics)_: track raw message sent by type --- metrics/wakumetrics/client.go | 5 +++++ metrics/wakumetrics/metrics.go | 20 ++++++++++++++++++++ protocol/common/message_sender.go | 22 ++++++++++++++++++++++ protocol/common/message_sender_test.go | 6 ++++++ protocol/messenger.go | 1 + protocol/messenger_builder_test.go | 7 +++++++ protocol/messenger_peersyncing.go | 5 +++++ 7 files changed, 66 insertions(+) diff --git a/metrics/wakumetrics/client.go b/metrics/wakumetrics/client.go index 920e9e3131a..00a635ea057 100644 --- a/metrics/wakumetrics/client.go +++ b/metrics/wakumetrics/client.go @@ -161,6 +161,11 @@ func (c *Client) PushSentMessageTotal(messageSize uint32, publishMethod string) MessagesSentTotal.WithLabelValues(publishMethod).Inc() } +func (c *Client) PushRawMessageByType(messageType string, messageSize uint32, pubsubTopic string, contentTopic string) { + RawMessagesSizeBytes.WithLabelValues(messageType, pubsubTopic, contentTopic).Add(float64(messageSize)) + RawMessagesSentTotal.WithLabelValues(messageType, pubsubTopic, contentTopic).Inc() +} + func getOriginString(origin wps.Origin) string { switch origin { case wps.Unknown: diff --git a/metrics/wakumetrics/metrics.go b/metrics/wakumetrics/metrics.go index 6b949de0ad8..e28f7ce3aca 100644 --- a/metrics/wakumetrics/metrics.go +++ b/metrics/wakumetrics/metrics.go @@ -119,6 +119,22 @@ var ( }, []string{"peer_id"}, ) + + RawMessagesSizeBytes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statusgo_waku_raw_message_size_bytes", + Help: "Size of each raw message in bytes sent by this node", + }, + []string{"message_type", "pubsub_topic", "content_topic"}, + ) + + RawMessagesSentTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statusgo_waku_raw_message_sent_total", + Help: "Total number of raw messages sent by this node", + }, + []string{"message_type", "pubsub_topic", "content_topic"}, + ) ) // RegisterMetrics registers all metrics with the provided registry @@ -138,6 +154,8 @@ func RegisterMetrics() error { StoreQueryFailures, MissedMessages, PeerId, + RawMessagesSizeBytes, + RawMessagesSentTotal, } for _, collector := range collectors { @@ -165,6 +183,8 @@ func UnregisterMetrics() error { StoreQueryFailures, MissedMessages, PeerId, + RawMessagesSizeBytes, + RawMessagesSentTotal, } for _, collector := range collectors { diff --git a/protocol/common/message_sender.go b/protocol/common/message_sender.go index 12f52825eaf..25148786ef2 100644 --- a/protocol/common/message_sender.go +++ b/protocol/common/message_sender.go @@ -66,6 +66,10 @@ type MessageEvent struct { RawMessage *RawMessage } +type MetricsHandler interface { + PushRawMessageByType(messageType string, messageSize uint32, pubsubTopic string, contentTopic string) +} + type MessageSender struct { identity *ecdsa.PrivateKey datasync *datasync.DataSync @@ -90,6 +94,8 @@ type MessageSender struct { // handleSharedSecrets is a callback that is called every time a new shared secret is negotiated handleSharedSecrets func([]*sharedsecret.Secret) error + + metricsHandler MetricsHandler } func NewMessageSender( @@ -115,6 +121,10 @@ func NewMessageSender( return p, nil } +func (s *MessageSender) SetMetricsHandler(handler MetricsHandler) { + s.metricsHandler = handler +} + func (s *MessageSender) Stop() { s.messageEventsSubscriptionsMutex.Lock() defer s.messageEventsSubscriptionsMutex.Unlock() @@ -434,6 +444,8 @@ func (s *MessageSender) sendCommunity( zap.String("messageType", "community"), zap.Any("contentType", rawMessage.MessageType), zap.Strings("hashes", types.EncodeHexes(hashes))) + + s.sendBandwidthMetrics(rawMessage) s.transport.Track(messageID, hashes, newMessages) return messageID, nil @@ -529,6 +541,7 @@ func (s *MessageSender) sendPrivate( zap.String("messageType", "private"), zap.Any("contentType", rawMessage.MessageType), zap.Strings("hashes", types.EncodeHexes(hashes))) + s.sendBandwidthMetrics(rawMessage) s.transport.Track(messageID, hashes, newMessages) } else { @@ -549,6 +562,7 @@ func (s *MessageSender) sendPrivate( zap.Any("contentType", rawMessage.MessageType), zap.String("messageType", "private"), zap.Strings("hashes", types.EncodeHexes(hashes))) + s.sendBandwidthMetrics(rawMessage) s.transport.Track(messageID, hashes, newMessages) } @@ -580,6 +594,7 @@ func (s *MessageSender) SendPairInstallation( return nil, errors.Wrap(err, "failed to send a message spec") } + s.sendBandwidthMetrics(&rawMessage) s.transport.Track(messageID, hashes, newMessages) return messageID, nil @@ -810,6 +825,7 @@ func (s *MessageSender) SendPublic( zap.Any("contentType", rawMessage.MessageType), zap.String("messageType", "public"), zap.Strings("hashes", types.EncodeHexes(hashes))) + s.sendBandwidthMetrics(&rawMessage) s.transport.Track(messageID, hashes, newMessages) return messageID, nil @@ -1383,3 +1399,9 @@ func (s *MessageSender) CleanupHashRatchetEncryptedMessages() error { return nil } + +func (s *MessageSender) sendBandwidthMetrics(rawMessage *RawMessage) { + if s.metricsHandler != nil { + s.metricsHandler.PushRawMessageByType(rawMessage.MessageType.String(), uint32(len(rawMessage.Payload)), rawMessage.PubsubTopic, rawMessage.ContentTopic) + } +} diff --git a/protocol/common/message_sender_test.go b/protocol/common/message_sender_test.go index 76c201229ca..e747cdcff13 100644 --- a/protocol/common/message_sender_test.go +++ b/protocol/common/message_sender_test.go @@ -39,6 +39,11 @@ type MessageSenderSuite struct { logger *zap.Logger } +type mockMetricsHandler struct{} + +func (m *mockMetricsHandler) PushRawMessageByType(messageType string, messageSize uint32, pubsubTopic string, contentTopic string) { +} + func (s *MessageSenderSuite) SetupTest() { s.testMessage = protobuf.ChatMessage{ Text: "abc123", @@ -94,6 +99,7 @@ func (s *MessageSenderSuite) SetupTest() { Datasync: true, }, ) + s.sender.SetMetricsHandler(&mockMetricsHandler{}) s.Require().NoError(err) } diff --git a/protocol/messenger.go b/protocol/messenger.go index d4a7eff9d51..6abd2c2e2e6 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -540,6 +540,7 @@ func NewMessenger( if err != nil { return nil, err } + sender.SetMetricsHandler(wakuMetricsHandler) } ctx, cancel := context.WithCancel(context.Background()) diff --git a/protocol/messenger_builder_test.go b/protocol/messenger_builder_test.go index 983bdc164d3..9f5c2db222d 100644 --- a/protocol/messenger_builder_test.go +++ b/protocol/messenger_builder_test.go @@ -53,6 +53,11 @@ func (tmc *testMessengerConfig) complete() error { return nil } +type mockMetricsHandler struct{} + +func (m *mockMetricsHandler) PushRawMessageByType(messageType string, messageSize uint32, pubsubTopic string, contentTopic string) { +} + func newTestMessenger(waku wakutypes.Waku, config testMessengerConfig) (*Messenger, error) { err := config.complete() if err != nil { @@ -119,6 +124,8 @@ func newTestMessenger(waku wakutypes.Waku, config testMessengerConfig) (*Messeng return nil, err } + m.sender.SetMetricsHandler(&mockMetricsHandler{}) + return m, nil } diff --git a/protocol/messenger_peersyncing.go b/protocol/messenger_peersyncing.go index b47149f364e..bad1f900b9b 100644 --- a/protocol/messenger_peersyncing.go +++ b/protocol/messenger_peersyncing.go @@ -415,6 +415,11 @@ func (m *Messenger) sendDataSync(receiver state.PeerID, payload *datasyncproto.P } m.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.Strings("hashes", types.EncodeHexes(hashes))) + if m.wakuMetricsHandler != nil { + for _, message := range newMessages { + m.wakuMetricsHandler.PushRawMessageByType("DATASYNC", uint32(len(message.Payload)), message.PubsubTopic, "") + } + } m.transport.TrackMany(messageIDs, hashes, newMessages) return nil