Skip to content

Commit

Permalink
feat(metrics)_: track raw message sent by type
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Jan 24, 2025
1 parent 5340c57 commit 1f5b2d7
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 0 deletions.
5 changes: 5 additions & 0 deletions metrics/wakumetrics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ func (c *Client) PushSentMessageTotal(messageSize uint32, publishMethod string)
MessagesSentTotal.WithLabelValues(publishMethod).Inc()
}

func (c *Client) PushRawMessageByType(messageType string, messageSize uint32, pubsubTopic string, contentTopic string) {
RawMessagesSizeBytes.WithLabelValues(messageType, pubsubTopic, contentTopic).Add(float64(messageSize))
RawMessagesSentTotal.WithLabelValues(messageType, pubsubTopic, contentTopic).Inc()
}

func getOriginString(origin wps.Origin) string {
switch origin {
case wps.Unknown:
Expand Down
20 changes: 20 additions & 0 deletions metrics/wakumetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,22 @@ var (
},
[]string{"peer_id"},
)

RawMessagesSizeBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statusgo_waku_raw_message_size_bytes",
Help: "Size of each raw message in bytes sent by this node",
},
[]string{"message_type", "pubsub_topic", "content_topic"},
)

RawMessagesSentTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statusgo_waku_raw_message_sent_total",
Help: "Total number of raw messages sent by this node",
},
[]string{"message_type", "pubsub_topic", "content_topic"},
)
)

// RegisterMetrics registers all metrics with the provided registry
Expand All @@ -138,6 +154,8 @@ func RegisterMetrics() error {
StoreQueryFailures,
MissedMessages,
PeerId,
RawMessagesSizeBytes,
RawMessagesSentTotal,
}

for _, collector := range collectors {
Expand Down Expand Up @@ -165,6 +183,8 @@ func UnregisterMetrics() error {
StoreQueryFailures,
MissedMessages,
PeerId,
RawMessagesSizeBytes,
RawMessagesSentTotal,
}

for _, collector := range collectors {
Expand Down
22 changes: 22 additions & 0 deletions protocol/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ type MessageEvent struct {
RawMessage *RawMessage
}

type MetricsHandler interface {
PushRawMessageByType(messageType string, messageSize uint32, pubsubTopic string, contentTopic string)
}

type MessageSender struct {
identity *ecdsa.PrivateKey
datasync *datasync.DataSync
Expand All @@ -90,6 +94,8 @@ type MessageSender struct {

// handleSharedSecrets is a callback that is called every time a new shared secret is negotiated
handleSharedSecrets func([]*sharedsecret.Secret) error

metricsHandler MetricsHandler
}

func NewMessageSender(
Expand All @@ -115,6 +121,10 @@ func NewMessageSender(
return p, nil
}

func (s *MessageSender) SetMetricsHandler(handler MetricsHandler) {
s.metricsHandler = handler
}

func (s *MessageSender) Stop() {
s.messageEventsSubscriptionsMutex.Lock()
defer s.messageEventsSubscriptionsMutex.Unlock()
Expand Down Expand Up @@ -434,6 +444,8 @@ func (s *MessageSender) sendCommunity(
zap.String("messageType", "community"),
zap.Any("contentType", rawMessage.MessageType),
zap.Strings("hashes", types.EncodeHexes(hashes)))

s.sendBandwidthMetrics(rawMessage)
s.transport.Track(messageID, hashes, newMessages)

return messageID, nil
Expand Down Expand Up @@ -529,6 +541,7 @@ func (s *MessageSender) sendPrivate(
zap.String("messageType", "private"),
zap.Any("contentType", rawMessage.MessageType),
zap.Strings("hashes", types.EncodeHexes(hashes)))
s.sendBandwidthMetrics(rawMessage)
s.transport.Track(messageID, hashes, newMessages)

} else {
Expand All @@ -549,6 +562,7 @@ func (s *MessageSender) sendPrivate(
zap.Any("contentType", rawMessage.MessageType),
zap.String("messageType", "private"),
zap.Strings("hashes", types.EncodeHexes(hashes)))
s.sendBandwidthMetrics(rawMessage)
s.transport.Track(messageID, hashes, newMessages)
}

Expand Down Expand Up @@ -580,6 +594,7 @@ func (s *MessageSender) SendPairInstallation(
return nil, errors.Wrap(err, "failed to send a message spec")
}

s.sendBandwidthMetrics(&rawMessage)
s.transport.Track(messageID, hashes, newMessages)

return messageID, nil
Expand Down Expand Up @@ -810,6 +825,7 @@ func (s *MessageSender) SendPublic(
zap.Any("contentType", rawMessage.MessageType),
zap.String("messageType", "public"),
zap.Strings("hashes", types.EncodeHexes(hashes)))
s.sendBandwidthMetrics(&rawMessage)
s.transport.Track(messageID, hashes, newMessages)

return messageID, nil
Expand Down Expand Up @@ -1383,3 +1399,9 @@ func (s *MessageSender) CleanupHashRatchetEncryptedMessages() error {

return nil
}

func (s *MessageSender) sendBandwidthMetrics(rawMessage *RawMessage) {
if s.metricsHandler != nil {
s.metricsHandler.PushRawMessageByType(rawMessage.MessageType.String(), uint32(len(rawMessage.Payload)), rawMessage.PubsubTopic, rawMessage.ContentTopic)
}
}
6 changes: 6 additions & 0 deletions protocol/common/message_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ type MessageSenderSuite struct {
logger *zap.Logger
}

type mockMetricsHandler struct{}

func (m *mockMetricsHandler) PushRawMessageByType(messageType string, messageSize uint32, pubsubTopic string, contentTopic string) {
}

func (s *MessageSenderSuite) SetupTest() {
s.testMessage = protobuf.ChatMessage{
Text: "abc123",
Expand Down Expand Up @@ -94,6 +99,7 @@ func (s *MessageSenderSuite) SetupTest() {
Datasync: true,
},
)
s.sender.SetMetricsHandler(&mockMetricsHandler{})
s.Require().NoError(err)
}

Expand Down
1 change: 1 addition & 0 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ func NewMessenger(
if err != nil {
return nil, err
}
sender.SetMetricsHandler(wakuMetricsHandler)
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down
7 changes: 7 additions & 0 deletions protocol/messenger_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func (tmc *testMessengerConfig) complete() error {
return nil
}

type mockMetricsHandler struct{}

func (m *mockMetricsHandler) PushRawMessageByType(messageType string, messageSize uint32, pubsubTopic string, contentTopic string) {
}

func newTestMessenger(waku wakutypes.Waku, config testMessengerConfig) (*Messenger, error) {
err := config.complete()
if err != nil {
Expand Down Expand Up @@ -119,6 +124,8 @@ func newTestMessenger(waku wakutypes.Waku, config testMessengerConfig) (*Messeng
return nil, err
}

m.sender.SetMetricsHandler(&mockMetricsHandler{})

return m, nil
}

Expand Down
5 changes: 5 additions & 0 deletions protocol/messenger_peersyncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,11 @@ func (m *Messenger) sendDataSync(receiver state.PeerID, payload *datasyncproto.P
}

m.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.Strings("hashes", types.EncodeHexes(hashes)))
if m.wakuMetricsHandler != nil {
for _, message := range newMessages {
m.wakuMetricsHandler.PushRawMessageByType("DATASYNC", uint32(len(message.Payload)), message.PubsubTopic, "")
}
}
m.transport.TrackMany(messageIDs, hashes, newMessages)

return nil
Expand Down

0 comments on commit 1f5b2d7

Please sign in to comment.