Skip to content

Commit

Permalink
chore(telemetry)_: refactor common fields used across metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Aug 7, 2024
1 parent 4c5f52e commit 711d6d7
Showing 1 changed file with 48 additions and 76 deletions.
124 changes: 48 additions & 76 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,6 @@ func WithPeerID(peerId string) TelemetryClientOption {
}
}

func WithDeviceType(deviceType string) TelemetryClientOption {
return func(c *Client) {
c.deviceType = deviceType
}
}

func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client {
serverURL = strings.TrimRight(serverURL, "/")
client := &Client{
Expand Down Expand Up @@ -294,110 +288,88 @@ func (c *Client) pushTelemetryRequest(request []TelemetryRequest) error {
return nil
}

func (c *Client) commonPostBody() map[string]interface{} {
return map[string]interface{}{
"nodeName": c.nodeName,
"peerId": c.peerId,
"statusVersion": c.version,
"deviceType": c.deviceType,
"timestamp": time.Now().Unix(),
}
}

func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *json.RawMessage {
var postBody []map[string]interface{}
for _, message := range receivedMessages.Messages {
postBody = append(postBody, map[string]interface{}{
"chatId": receivedMessages.Filter.ChatID,
"messageHash": types.EncodeHex(receivedMessages.SSHMessage.Hash),
"messageId": message.ApplicationLayer.ID,
"sentAt": receivedMessages.SSHMessage.Timestamp,
"pubsubTopic": receivedMessages.Filter.PubsubTopic,
"topic": receivedMessages.Filter.ContentTopic.String(),
"messageType": message.ApplicationLayer.Type.String(),
"receiverKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"messageSize": len(receivedMessages.SSHMessage.Payload),
"statusVersion": c.version,
"deviceType": c.deviceType,
})
messageBody := c.commonPostBody()
messageBody["chatId"] = receivedMessages.Filter.ChatID
messageBody["messageHash"] = types.EncodeHex(receivedMessages.SSHMessage.Hash)
messageBody["messageId"] = message.ApplicationLayer.ID
messageBody["sentAt"] = receivedMessages.SSHMessage.Timestamp
messageBody["pubsubTopic"] = receivedMessages.Filter.PubsubTopic
messageBody["topic"] = receivedMessages.Filter.ContentTopic.String()
messageBody["messageType"] = message.ApplicationLayer.Type.String()
messageBody["receiverKeyUID"] = c.keyUID
messageBody["messageSize"] = len(receivedMessages.SSHMessage.Payload)
postBody = append(postBody, messageBody)
}
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessReceivedEnvelope(envelope *v2protocol.Envelope) *json.RawMessage {
postBody := map[string]interface{}{
"messageHash": envelope.Hash().String(),
"sentAt": uint32(envelope.Message().GetTimestamp() / int64(time.Second)),
"pubsubTopic": envelope.PubsubTopic(),
"topic": envelope.Message().ContentTopic,
"receiverKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"statusVersion": c.version,
"deviceType": c.deviceType,
}
postBody := c.commonPostBody()
postBody["messageHash"] = envelope.Hash().String()
postBody["sentAt"] = uint32(envelope.Message().GetTimestamp() / int64(time.Second))
postBody["pubsubTopic"] = envelope.PubsubTopic()
postBody["topic"] = envelope.Message().ContentTopic
postBody["receiverKeyUID"] = c.keyUID
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.RawMessage {
postBody := map[string]interface{}{
"messageHash": sentEnvelope.Envelope.Hash().String(),
"sentAt": uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)),
"pubsubTopic": sentEnvelope.Envelope.PubsubTopic(),
"topic": sentEnvelope.Envelope.Message().ContentTopic,
"senderKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"publishMethod": sentEnvelope.PublishMethod.String(),
"statusVersion": c.version,
"deviceType": c.deviceType,
}
postBody := c.commonPostBody()
postBody["messageHash"] = sentEnvelope.Envelope.Hash().String()
postBody["sentAt"] = uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second))
postBody["pubsubTopic"] = sentEnvelope.Envelope.PubsubTopic()
postBody["topic"] = sentEnvelope.Envelope.Message().ContentTopic
postBody["senderKeyUID"] = c.keyUID
postBody["publishMethod"] = sentEnvelope.PublishMethod.String()
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) *json.RawMessage {
postBody := map[string]interface{}{
"messageHash": errorSendingEnvelope.SentEnvelope.Envelope.Hash().String(),
"sentAt": uint32(errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)),
"pubsubTopic": errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic(),
"topic": errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic,
"senderKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"publishMethod": errorSendingEnvelope.SentEnvelope.PublishMethod.String(),
"statusVersion": c.version,
"error": errorSendingEnvelope.Error.Error(),
"deviceType": c.deviceType,
}
postBody := c.commonPostBody()
postBody["messageHash"] = errorSendingEnvelope.SentEnvelope.Envelope.Hash().String()
postBody["sentAt"] = uint32(errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second))
postBody["pubsubTopic"] = errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic()
postBody["topic"] = errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic
postBody["senderKeyUID"] = c.keyUID
postBody["publishMethod"] = errorSendingEnvelope.SentEnvelope.PublishMethod.String()
postBody["error"] = errorSendingEnvelope.Error.Error()
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage {
postBody := map[string]interface{}{
"peerCount": peerCount.PeerCount,
"nodeName": c.nodeName,
"nodeKeyUID": c.keyUID,
"peerId": c.peerId,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
"deviceType": c.deviceType,
}
postBody := c.commonPostBody()
postBody["peerCount"] = peerCount.PeerCount
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage {
postBody := map[string]interface{}{
"failedPeerId": peerConnFailure.FailedPeerId,
"failureCount": peerConnFailure.FailureCount,
"nodeName": c.nodeName,
"nodeKeyUID": c.keyUID,
"peerId": c.peerId,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
"deviceType": c.deviceType,
}
postBody := c.commonPostBody()
postBody["failedPeerId"] = peerConnFailure.FailedPeerId
postBody["failureCount"] = peerConnFailure.FailureCount
postBody["nodeKeyUID"] = c.keyUID
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
Expand Down

0 comments on commit 711d6d7

Please sign in to comment.