From 74f9d5bac0acb30993c3583fe15d397d580e4ec8 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Wed, 7 Aug 2024 16:56:36 -0700 Subject: [PATCH] chore(telemetry)_: refactor common fields used across metrics --- telemetry/client.go | 124 +++++++++++++++++--------------------------- 1 file changed, 48 insertions(+), 76 deletions(-) diff --git a/telemetry/client.go b/telemetry/client.go index dba5fc159f6..66019c011ea 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -126,12 +126,6 @@ func WithPeerID(peerId string) TelemetryClientOption { } } -func WithDeviceType(deviceType string) TelemetryClientOption { - return func(c *Client) { - c.deviceType = deviceType - } -} - func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client { serverURL = strings.TrimRight(serverURL, "/") client := &Client{ @@ -298,24 +292,30 @@ func (c *Client) pushTelemetryRequest(request []TelemetryRequest) error { return nil } +func (c *Client) commonPostBody() map[string]interface{} { + return map[string]interface{}{ + "nodeName": c.nodeName, + "peerId": c.peerId, + "statusVersion": c.version, + "deviceType": c.deviceType, + "timestamp": time.Now().Unix(), + } +} + func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *json.RawMessage { var postBody []map[string]interface{} for _, message := range receivedMessages.Messages { - postBody = append(postBody, map[string]interface{}{ - "chatId": receivedMessages.Filter.ChatID, - "messageHash": types.EncodeHex(receivedMessages.SSHMessage.Hash), - "messageId": message.ApplicationLayer.ID, - "sentAt": receivedMessages.SSHMessage.Timestamp, - "pubsubTopic": receivedMessages.Filter.PubsubTopic, - "topic": receivedMessages.Filter.ContentTopic.String(), - "messageType": message.ApplicationLayer.Type.String(), - "receiverKeyUID": c.keyUID, - "peerId": c.peerId, - "nodeName": c.nodeName, - "messageSize": len(receivedMessages.SSHMessage.Payload), - "statusVersion": c.version, - "deviceType": c.deviceType, - }) + messageBody := c.commonPostBody() + messageBody["chatId"] = receivedMessages.Filter.ChatID + messageBody["messageHash"] = types.EncodeHex(receivedMessages.SSHMessage.Hash) + messageBody["messageId"] = message.ApplicationLayer.ID + messageBody["sentAt"] = receivedMessages.SSHMessage.Timestamp + messageBody["pubsubTopic"] = receivedMessages.Filter.PubsubTopic + messageBody["topic"] = receivedMessages.Filter.ContentTopic.String() + messageBody["messageType"] = message.ApplicationLayer.Type.String() + messageBody["receiverKeyUID"] = c.keyUID + messageBody["messageSize"] = len(receivedMessages.SSHMessage.Payload) + postBody = append(postBody, messageBody) } body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) @@ -323,85 +323,57 @@ func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *jso } func (c *Client) ProcessReceivedEnvelope(envelope *v2protocol.Envelope) *json.RawMessage { - postBody := map[string]interface{}{ - "messageHash": envelope.Hash().String(), - "sentAt": uint32(envelope.Message().GetTimestamp() / int64(time.Second)), - "pubsubTopic": envelope.PubsubTopic(), - "topic": envelope.Message().ContentTopic, - "receiverKeyUID": c.keyUID, - "peerId": c.peerId, - "nodeName": c.nodeName, - "statusVersion": c.version, - "deviceType": c.deviceType, - } + postBody := c.commonPostBody() + postBody["messageHash"] = envelope.Hash().String() + postBody["sentAt"] = uint32(envelope.Message().GetTimestamp() / int64(time.Second)) + postBody["pubsubTopic"] = envelope.PubsubTopic() + postBody["topic"] = envelope.Message().ContentTopic + postBody["receiverKeyUID"] = c.keyUID body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) return &jsonRawMessage } func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.RawMessage { - postBody := map[string]interface{}{ - "messageHash": sentEnvelope.Envelope.Hash().String(), - "sentAt": uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)), - "pubsubTopic": sentEnvelope.Envelope.PubsubTopic(), - "topic": sentEnvelope.Envelope.Message().ContentTopic, - "senderKeyUID": c.keyUID, - "peerId": c.peerId, - "nodeName": c.nodeName, - "publishMethod": sentEnvelope.PublishMethod.String(), - "statusVersion": c.version, - "deviceType": c.deviceType, - } + postBody := c.commonPostBody() + postBody["messageHash"] = sentEnvelope.Envelope.Hash().String() + postBody["sentAt"] = uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)) + postBody["pubsubTopic"] = sentEnvelope.Envelope.PubsubTopic() + postBody["topic"] = sentEnvelope.Envelope.Message().ContentTopic + postBody["senderKeyUID"] = c.keyUID + postBody["publishMethod"] = sentEnvelope.PublishMethod.String() body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) return &jsonRawMessage } func (c *Client) ProcessErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) *json.RawMessage { - postBody := map[string]interface{}{ - "messageHash": errorSendingEnvelope.SentEnvelope.Envelope.Hash().String(), - "sentAt": uint32(errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)), - "pubsubTopic": errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic(), - "topic": errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic, - "senderKeyUID": c.keyUID, - "peerId": c.peerId, - "nodeName": c.nodeName, - "publishMethod": errorSendingEnvelope.SentEnvelope.PublishMethod.String(), - "statusVersion": c.version, - "error": errorSendingEnvelope.Error.Error(), - "deviceType": c.deviceType, - } + postBody := c.commonPostBody() + postBody["messageHash"] = errorSendingEnvelope.SentEnvelope.Envelope.Hash().String() + postBody["sentAt"] = uint32(errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)) + postBody["pubsubTopic"] = errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic() + postBody["topic"] = errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic + postBody["senderKeyUID"] = c.keyUID + postBody["publishMethod"] = errorSendingEnvelope.SentEnvelope.PublishMethod.String() + postBody["error"] = errorSendingEnvelope.Error.Error() body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) return &jsonRawMessage } func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage { - postBody := map[string]interface{}{ - "peerCount": peerCount.PeerCount, - "nodeName": c.nodeName, - "nodeKeyUID": c.keyUID, - "peerId": c.peerId, - "statusVersion": c.version, - "timestamp": time.Now().Unix(), - "deviceType": c.deviceType, - } + postBody := c.commonPostBody() + postBody["peerCount"] = peerCount.PeerCount body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) return &jsonRawMessage } func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage { - postBody := map[string]interface{}{ - "failedPeerId": peerConnFailure.FailedPeerId, - "failureCount": peerConnFailure.FailureCount, - "nodeName": c.nodeName, - "nodeKeyUID": c.keyUID, - "peerId": c.peerId, - "statusVersion": c.version, - "timestamp": time.Now().Unix(), - "deviceType": c.deviceType, - } + postBody := c.commonPostBody() + postBody["failedPeerId"] = peerConnFailure.FailedPeerId + postBody["failureCount"] = peerConnFailure.FailureCount + postBody["nodeKeyUID"] = c.keyUID body, _ := json.Marshal(postBody) jsonRawMessage := json.RawMessage(body) return &jsonRawMessage