Skip to content

Commit

Permalink
feat(telemetry)_: include device type in metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Aug 7, 2024
1 parent c688542 commit c44f9fc
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 1 deletion.
11 changes: 11 additions & 0 deletions api/geth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2658,6 +2658,17 @@ func (b *GethStatusBackend) injectAccountsIntoWakuService(w types.WakuKeyManager
b.statusNode.ChatService(accDB).Init(messenger)
b.statusNode.EnsService().Init(messenger.SyncEnsNamesWithDispatchMessage)
b.statusNode.CommunityTokensService().Init(messenger)

if messenger != nil && b.statusNode.WakuV2Service() != nil {
var deviceType string
for _, inst := range messenger.Installations() {
if inst.ID == messenger.InstallationID() {
deviceType = inst.InstallationMetadata.DeviceType
break
}
}
b.statusNode.WakuV2Service().InitTelemetryClient(deviceType)
}
}

return nil
Expand Down
5 changes: 4 additions & 1 deletion protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,10 @@ func NewMessenger(

var telemetryClient *telemetry.Client
if c.telemetryServerURL != "" {
telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version, telemetry.WithPeerID(peerId.String()))
options := []telemetry.TelemetryClientOption{
telemetry.WithPeerID(peerId.String()),
}
telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version, options...)
if c.wakuService != nil {
c.wakuService.SetStatusTelemetryClient(telemetryClient)
}
Expand Down
18 changes: 18 additions & 0 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type Client struct {
sendPeriod time.Duration
lastPeerCount int
lastPeerConnFailures map[string]int
deviceType string
}

type TelemetryClientOption func(*Client)
Expand All @@ -125,6 +126,12 @@ func WithPeerID(peerId string) TelemetryClientOption {
}
}

func WithDeviceType(deviceType string) TelemetryClientOption {
return func(c *Client) {
c.deviceType = deviceType
}
}

func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client {
serverURL = strings.TrimRight(serverURL, "/")
client := &Client{
Expand Down Expand Up @@ -152,6 +159,10 @@ func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName str
return client
}

func (c *Client) SetDeviceType(deviceType string) {
c.deviceType = deviceType
}

func (c *Client) Start(ctx context.Context) {
go func() {

Expand Down Expand Up @@ -299,6 +310,7 @@ func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *jso
"nodeName": c.nodeName,
"messageSize": len(receivedMessages.SSHMessage.Payload),
"statusVersion": c.version,
"deviceType": c.deviceType,
})
}
body, _ := json.Marshal(postBody)
Expand All @@ -316,6 +328,7 @@ func (c *Client) ProcessReceivedEnvelope(envelope *v2protocol.Envelope) *json.Ra
"peerId": c.peerId,
"nodeName": c.nodeName,
"statusVersion": c.version,
"deviceType": c.deviceType,
}
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
Expand All @@ -333,6 +346,7 @@ func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.Raw
"nodeName": c.nodeName,
"publishMethod": sentEnvelope.PublishMethod.String(),
"statusVersion": c.version,
"deviceType": c.deviceType,
}
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
Expand All @@ -351,6 +365,7 @@ func (c *Client) ProcessErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSe
"publishMethod": errorSendingEnvelope.SentEnvelope.PublishMethod.String(),
"statusVersion": c.version,
"error": errorSendingEnvelope.Error.Error(),
"deviceType": c.deviceType,
}
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
Expand All @@ -365,6 +380,7 @@ func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage {
"peerId": c.peerId,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
"deviceType": c.deviceType,
}
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
Expand All @@ -380,6 +396,7 @@ func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.R
"peerId": c.peerId,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
"deviceType": c.deviceType,
}
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
Expand All @@ -402,6 +419,7 @@ func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, proces
"peerId": c.peerId,
"nodeName": c.nodeName,
"processingError": errorString,
"deviceType": c.deviceType,
}
body, _ := json.Marshal(postBody)
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
Expand Down
8 changes: 8 additions & 0 deletions wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type ErrorSendingEnvelope struct {
}

type ITelemetryClient interface {
SetDeviceType(deviceType string)
PushReceivedEnvelope(receivedEnvelope *protocol.Envelope)
PushSentEnvelope(sentEnvelope SentEnvelope)
PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope)
Expand Down Expand Up @@ -198,6 +199,13 @@ func (w *Waku) SetStatusTelemetryClient(client ITelemetryClient) {
w.statusTelemetryClient = client
}

func (w *Waku) InitTelemetryClient(deviceType string) {
if w.statusTelemetryClient == nil {
return
}
w.statusTelemetryClient.SetDeviceType(deviceType)
}

func newTTLCache() *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] {
cache := ttlcache.New[gethcommon.Hash, *common.ReceivedMessage](ttlcache.WithTTL[gethcommon.Hash, *common.ReceivedMessage](cacheTTL))
go cache.Start()
Expand Down

0 comments on commit c44f9fc

Please sign in to comment.