diff --git a/protocol/messenger.go b/protocol/messenger.go index a2987eb348d..d2447073977 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -554,7 +554,10 @@ func NewMessenger( var telemetryClient *telemetry.Client if c.telemetryServerURL != "" { - telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version, telemetry.WithPeerID(peerId.String())) + options := []telemetry.TelemetryClientOption{ + telemetry.WithPeerID(peerId.String()), + } + telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version, options...) if c.wakuService != nil { c.wakuService.SetStatusTelemetryClient(telemetryClient) } diff --git a/protocol/messenger_pairing_and_syncing.go b/protocol/messenger_pairing_and_syncing.go index e071f559164..3ccc1554d97 100644 --- a/protocol/messenger_pairing_and_syncing.go +++ b/protocol/messenger_pairing_and_syncing.go @@ -415,6 +415,13 @@ func (m *Messenger) InitInstallations() error { return err } + if m.telemetryClient != nil { + installation, ok := m.allInstallations.Load(m.installationID) + if ok { + m.telemetryClient.SetDeviceType(installation.InstallationMetadata.DeviceType) + } + } + return nil } diff --git a/telemetry/client.go b/telemetry/client.go index 85b6112bb93..66019c011ea 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -109,6 +109,7 @@ type Client struct { sendPeriod time.Duration lastPeerCount int lastPeerConnFailures map[string]int + deviceType string } type TelemetryClientOption func(*Client) @@ -152,6 +153,10 @@ func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName str return client } +func (c *Client) SetDeviceType(deviceType string) { + c.deviceType = deviceType +} + func (c *Client) Start(ctx context.Context) { go func() { for { @@ -287,23 +292,30 @@ func (c *Client) pushTelemetryRequest(request []TelemetryRequest) error { return nil } +func (c *Client) commonPostBody() map[string]interface{} { + return map[string]interface{}{ + "nodeName": c.nodeName, + "peerId": c.peerId, + "statusVersion": c.version, + "deviceType": c.deviceType, + "timestamp": time.Now().Unix(), + } +} + func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *json.RawMessage { var postBody []map[string]interface{} for _, message := range receivedMessages.Messages { - postBody = append(postBody, map[string]interface{}{ - "chatId": receivedMessages.Filter.ChatID, - "messageHash": types.EncodeHex(receivedMessages.SSHMessage.Hash), - "messageId": message.ApplicationLayer.ID, - "sentAt": receivedMessages.SSHMessage.Timestamp, - "pubsubTopic": receivedMessages.Filter.PubsubTopic, - "topic": receivedMessages.Filter.ContentTopic.String(), - "messageType": message.ApplicationLayer.Type.String(), - "receiverKeyUID": c.keyUID, - "peerId": c.peerId, - "nodeName": c.nodeName, - "messageSize": len(receivedMessages.SSHMessage.Payload), - "statusVersion": c.version, - }) + messageBody := c.commonPostBody() + messageBody["chatId"] = receivedMessages.Filter.ChatID + messageBody["messageHash"] = types.EncodeHex(receivedMessages.SSHMessage.Hash) + messageBody["messageId"] = message.ApplicationLayer.ID + messageBody["sentAt"] = receivedMessages.SSHMessage.Timestamp + messageBody["pubsubTopic"] = receivedMessages.Filter.PubsubTopic + messageBody["topic"] = receivedMessages.Filter.ContentTopic.String() + messageBody["messageType"] = message.ApplicationLayer.Type.String() + messageBody["receiverKeyUID"] = c.keyUID + messageBody["messageSize"] = len(receivedMessages.SSHMessage.Payload) + postBody = append(postBody, messageBody) } body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) @@ -311,80 +323,57 @@ func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *jso } func (c *Client) ProcessReceivedEnvelope(envelope *v2protocol.Envelope) *json.RawMessage { - postBody := map[string]interface{}{ - "messageHash": envelope.Hash().String(), - "sentAt": uint32(envelope.Message().GetTimestamp() / int64(time.Second)), - "pubsubTopic": envelope.PubsubTopic(), - "topic": envelope.Message().ContentTopic, - "receiverKeyUID": c.keyUID, - "peerId": c.peerId, - "nodeName": c.nodeName, - "statusVersion": c.version, - } + postBody := c.commonPostBody() + postBody["messageHash"] = envelope.Hash().String() + postBody["sentAt"] = uint32(envelope.Message().GetTimestamp() / int64(time.Second)) + postBody["pubsubTopic"] = envelope.PubsubTopic() + postBody["topic"] = envelope.Message().ContentTopic + postBody["receiverKeyUID"] = c.keyUID body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) return &jsonRawMessage } func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.RawMessage { - postBody := map[string]interface{}{ - "messageHash": sentEnvelope.Envelope.Hash().String(), - "sentAt": uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)), - "pubsubTopic": sentEnvelope.Envelope.PubsubTopic(), - "topic": sentEnvelope.Envelope.Message().ContentTopic, - "senderKeyUID": c.keyUID, - "peerId": c.peerId, - "nodeName": c.nodeName, - "publishMethod": sentEnvelope.PublishMethod.String(), - "statusVersion": c.version, - } + postBody := c.commonPostBody() + postBody["messageHash"] = sentEnvelope.Envelope.Hash().String() + postBody["sentAt"] = uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)) + postBody["pubsubTopic"] = sentEnvelope.Envelope.PubsubTopic() + postBody["topic"] = sentEnvelope.Envelope.Message().ContentTopic + postBody["senderKeyUID"] = c.keyUID + postBody["publishMethod"] = sentEnvelope.PublishMethod.String() body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) return &jsonRawMessage } func (c *Client) ProcessErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) *json.RawMessage { - postBody := map[string]interface{}{ - "messageHash": errorSendingEnvelope.SentEnvelope.Envelope.Hash().String(), - "sentAt": uint32(errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)), - "pubsubTopic": errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic(), - "topic": errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic, - "senderKeyUID": c.keyUID, - "peerId": c.peerId, - "nodeName": c.nodeName, - "publishMethod": errorSendingEnvelope.SentEnvelope.PublishMethod.String(), - "statusVersion": c.version, - "error": errorSendingEnvelope.Error.Error(), - } + postBody := c.commonPostBody() + postBody["messageHash"] = errorSendingEnvelope.SentEnvelope.Envelope.Hash().String() + postBody["sentAt"] = uint32(errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)) + postBody["pubsubTopic"] = errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic() + postBody["topic"] = errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic + postBody["senderKeyUID"] = c.keyUID + postBody["publishMethod"] = errorSendingEnvelope.SentEnvelope.PublishMethod.String() + postBody["error"] = errorSendingEnvelope.Error.Error() body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) return &jsonRawMessage } func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage { - postBody := map[string]interface{}{ - "peerCount": peerCount.PeerCount, - "nodeName": c.nodeName, - "nodeKeyUID": c.keyUID, - "peerId": c.peerId, - "statusVersion": c.version, - "timestamp": time.Now().Unix(), - } + postBody := c.commonPostBody() + postBody["peerCount"] = peerCount.PeerCount body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) return &jsonRawMessage } func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage { - postBody := map[string]interface{}{ - "failedPeerId": peerConnFailure.FailedPeerId, - "failureCount": peerConnFailure.FailureCount, - "nodeName": c.nodeName, - "nodeKeyUID": c.keyUID, - "peerId": c.peerId, - "statusVersion": c.version, - "timestamp": time.Now().Unix(), - } + postBody := c.commonPostBody() + postBody["failedPeerId"] = peerConnFailure.FailedPeerId + postBody["failureCount"] = peerConnFailure.FailureCount + postBody["nodeKeyUID"] = c.keyUID body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) return &jsonRawMessage @@ -406,6 +395,7 @@ func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, proces "peerId": c.peerId, "nodeName": c.nodeName, "processingError": errorString, + "deviceType": c.deviceType, } body, _ := json.Marshal(postBody) _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 15fe6a21727..ed5849bd1d1 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -102,6 +102,7 @@ type ErrorSendingEnvelope struct { } type ITelemetryClient interface { + SetDeviceType(deviceType string) PushReceivedEnvelope(ctx context.Context, receivedEnvelope *protocol.Envelope) PushSentEnvelope(ctx context.Context, sentEnvelope SentEnvelope) PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope ErrorSendingEnvelope)