Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry)_: include device type in metrics #5669

Merged
merged 2 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,10 @@ func NewMessenger(

var telemetryClient *telemetry.Client
if c.telemetryServerURL != "" {
telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version, telemetry.WithPeerID(peerId.String()))
options := []telemetry.TelemetryClientOption{
telemetry.WithPeerID(peerId.String()),
}
telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version, options...)
if c.wakuService != nil {
c.wakuService.SetStatusTelemetryClient(telemetryClient)
}
Expand Down
7 changes: 7 additions & 0 deletions protocol/messenger_pairing_and_syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,13 @@ func (m *Messenger) InitInstallations() error {
return err
}

if m.telemetryClient != nil {
installation, ok := m.allInstallations.Load(m.installationID)
if ok {
m.telemetryClient.SetDeviceType(installation.InstallationMetadata.DeviceType)
}
}

return nil
}

Expand Down
118 changes: 54 additions & 64 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type Client struct {
sendPeriod time.Duration
lastPeerCount int
lastPeerConnFailures map[string]int
deviceType string
}

type TelemetryClientOption func(*Client)
Expand Down Expand Up @@ -152,6 +153,10 @@ func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName str
return client
}

func (c *Client) SetDeviceType(deviceType string) {
c.deviceType = deviceType
}

func (c *Client) Start(ctx context.Context) {
go func() {
for {
Expand Down Expand Up @@ -287,104 +292,88 @@ func (c *Client) pushTelemetryRequest(request []TelemetryRequest) error {
return nil
}

func (c *Client) commonPostBody() map[string]interface{} {
return map[string]interface{}{
"nodeName": c.nodeName,
"peerId": c.peerId,
"statusVersion": c.version,
"deviceType": c.deviceType,
"timestamp": time.Now().Unix(),
}
}

func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *json.RawMessage {
var postBody []map[string]interface{}
for _, message := range receivedMessages.Messages {
postBody = append(postBody, map[string]interface{}{
"chatId": receivedMessages.Filter.ChatID,
"messageHash": types.EncodeHex(receivedMessages.SSHMessage.Hash),
"messageId": message.ApplicationLayer.ID,
"sentAt": receivedMessages.SSHMessage.Timestamp,
"pubsubTopic": receivedMessages.Filter.PubsubTopic,
"topic": receivedMessages.Filter.ContentTopic.String(),
"messageType": message.ApplicationLayer.Type.String(),
"receiverKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"messageSize": len(receivedMessages.SSHMessage.Payload),
"statusVersion": c.version,
})
messageBody := c.commonPostBody()
messageBody["chatId"] = receivedMessages.Filter.ChatID
messageBody["messageHash"] = types.EncodeHex(receivedMessages.SSHMessage.Hash)
messageBody["messageId"] = message.ApplicationLayer.ID
messageBody["sentAt"] = receivedMessages.SSHMessage.Timestamp
messageBody["pubsubTopic"] = receivedMessages.Filter.PubsubTopic
messageBody["topic"] = receivedMessages.Filter.ContentTopic.String()
messageBody["messageType"] = message.ApplicationLayer.Type.String()
messageBody["receiverKeyUID"] = c.keyUID
messageBody["messageSize"] = len(receivedMessages.SSHMessage.Payload)
postBody = append(postBody, messageBody)
}
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessReceivedEnvelope(envelope *v2protocol.Envelope) *json.RawMessage {
postBody := map[string]interface{}{
"messageHash": envelope.Hash().String(),
"sentAt": uint32(envelope.Message().GetTimestamp() / int64(time.Second)),
"pubsubTopic": envelope.PubsubTopic(),
"topic": envelope.Message().ContentTopic,
"receiverKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"statusVersion": c.version,
}
postBody := c.commonPostBody()
postBody["messageHash"] = envelope.Hash().String()
postBody["sentAt"] = uint32(envelope.Message().GetTimestamp() / int64(time.Second))
postBody["pubsubTopic"] = envelope.PubsubTopic()
postBody["topic"] = envelope.Message().ContentTopic
postBody["receiverKeyUID"] = c.keyUID
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.RawMessage {
postBody := map[string]interface{}{
"messageHash": sentEnvelope.Envelope.Hash().String(),
"sentAt": uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)),
"pubsubTopic": sentEnvelope.Envelope.PubsubTopic(),
"topic": sentEnvelope.Envelope.Message().ContentTopic,
"senderKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"publishMethod": sentEnvelope.PublishMethod.String(),
"statusVersion": c.version,
}
postBody := c.commonPostBody()
postBody["messageHash"] = sentEnvelope.Envelope.Hash().String()
postBody["sentAt"] = uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second))
postBody["pubsubTopic"] = sentEnvelope.Envelope.PubsubTopic()
postBody["topic"] = sentEnvelope.Envelope.Message().ContentTopic
postBody["senderKeyUID"] = c.keyUID
postBody["publishMethod"] = sentEnvelope.PublishMethod.String()
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) *json.RawMessage {
postBody := map[string]interface{}{
"messageHash": errorSendingEnvelope.SentEnvelope.Envelope.Hash().String(),
"sentAt": uint32(errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)),
"pubsubTopic": errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic(),
"topic": errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic,
"senderKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"publishMethod": errorSendingEnvelope.SentEnvelope.PublishMethod.String(),
"statusVersion": c.version,
"error": errorSendingEnvelope.Error.Error(),
}
postBody := c.commonPostBody()
postBody["messageHash"] = errorSendingEnvelope.SentEnvelope.Envelope.Hash().String()
postBody["sentAt"] = uint32(errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second))
postBody["pubsubTopic"] = errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic()
postBody["topic"] = errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic
postBody["senderKeyUID"] = c.keyUID
postBody["publishMethod"] = errorSendingEnvelope.SentEnvelope.PublishMethod.String()
postBody["error"] = errorSendingEnvelope.Error.Error()
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage {
postBody := map[string]interface{}{
"peerCount": peerCount.PeerCount,
"nodeName": c.nodeName,
"nodeKeyUID": c.keyUID,
"peerId": c.peerId,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
}
postBody := c.commonPostBody()
postBody["peerCount"] = peerCount.PeerCount
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage {
postBody := map[string]interface{}{
"failedPeerId": peerConnFailure.FailedPeerId,
"failureCount": peerConnFailure.FailureCount,
"nodeName": c.nodeName,
"nodeKeyUID": c.keyUID,
"peerId": c.peerId,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
}
postBody := c.commonPostBody()
postBody["failedPeerId"] = peerConnFailure.FailedPeerId
postBody["failureCount"] = peerConnFailure.FailureCount
postBody["nodeKeyUID"] = c.keyUID
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
Expand All @@ -406,6 +395,7 @@ func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, proces
"peerId": c.peerId,
"nodeName": c.nodeName,
"processingError": errorString,
"deviceType": c.deviceType,
}
body, _ := json.Marshal(postBody)
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
Expand Down
1 change: 1 addition & 0 deletions wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type ErrorSendingEnvelope struct {
}

type ITelemetryClient interface {
SetDeviceType(deviceType string)
PushReceivedEnvelope(ctx context.Context, receivedEnvelope *protocol.Envelope)
PushSentEnvelope(ctx context.Context, sentEnvelope SentEnvelope)
PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope ErrorSendingEnvelope)
Expand Down