Skip to content

Commit

Permalink
chore_: remove waku's Message duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
osmaczko committed Jan 21, 2025
1 parent e78a612 commit b2bb680
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 85 deletions.
22 changes: 1 addition & 21 deletions waku/bridge/public_waku_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,7 @@ func (w *GethPublicWakuAPIWrapper) BloomFilter() []byte {
// GetFilterMessages returns the messages that match the filter criteria and
// are received between the last poll and now.
func (w *GethPublicWakuAPIWrapper) GetFilterMessages(id string) ([]*wakutypes.Message, error) {
msgs, err := w.api.GetFilterMessages(id)
if err != nil {
return nil, err
}

wrappedMsgs := make([]*wakutypes.Message, len(msgs))
for index, msg := range msgs {
wrappedMsgs[index] = &wakutypes.Message{
Sig: msg.Sig,
TTL: msg.TTL,
Timestamp: msg.Timestamp,
Topic: wakutypes.TopicType(msg.Topic),
Payload: msg.Payload,
Padding: msg.Padding,
PoW: msg.PoW,
Hash: msg.Hash,
Dst: msg.Dst,
P2P: msg.P2P,
}
}
return wrappedMsgs, nil
return w.api.GetFilterMessages(id)
}

// Post posts a message on the network.
Expand Down
20 changes: 1 addition & 19 deletions waku/bridge/public_wakuv2_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,7 @@ func (w *gethPublicWakuV2APIWrapper) NewMessageFilter(req wakutypes.Criteria) (s
// GetFilterMessages returns the messages that match the filter criteria and
// are received between the last poll and now.
func (w *gethPublicWakuV2APIWrapper) GetFilterMessages(id string) ([]*wakutypes.Message, error) {
msgs, err := w.api.GetFilterMessages(id)
if err != nil {
return nil, err
}

wrappedMsgs := make([]*wakutypes.Message, len(msgs))
for index, msg := range msgs {
wrappedMsgs[index] = &wakutypes.Message{
Sig: msg.Sig,
Timestamp: msg.Timestamp,
PubsubTopic: msg.PubsubTopic,
Topic: wakutypes.TopicType(msg.ContentTopic),
Payload: msg.Payload,
Padding: msg.Padding,
Hash: msg.Hash,
Dst: msg.Dst,
}
}
return wrappedMsgs, nil
return w.api.GetFilterMessages(id)
}

// Post posts a message on the network.
Expand Down
28 changes: 7 additions & 21 deletions wakuv1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,30 +402,16 @@ func (api *PublicWakuAPI) Messages(ctx context.Context, crit types.Criteria) (*r
return rpcSub, nil
}

// Message is the RPC representation of a waku message.
type Message struct {
Sig []byte `json:"sig,omitempty"`
TTL uint32 `json:"ttl"`
Timestamp uint32 `json:"timestamp"`
Topic common.TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
PoW float64 `json:"pow"`
Hash []byte `json:"hash"`
Dst []byte `json:"recipientPublicKey,omitempty"`
P2P bool `json:"bool,omitempty"`
}

// ToWakuMessage converts an internal message into an API version.
func ToWakuMessage(message *common.ReceivedMessage) *Message {
msg := Message{
func ToWakuMessage(message *common.ReceivedMessage) *types.Message {
msg := types.Message{
Payload: message.Payload,
Padding: message.Padding,
Timestamp: message.Sent,
TTL: message.TTL,
PoW: message.PoW,
Hash: message.EnvelopeHash.Bytes(),
Topic: message.Topic,
Topic: types.TopicType(message.Topic),
P2P: message.P2P,
}

Expand All @@ -447,8 +433,8 @@ func ToWakuMessage(message *common.ReceivedMessage) *Message {
}

// toMessage converts a set of messages to its RPC representation.
func toMessage(messages []*common.ReceivedMessage) []*Message {
msgs := make([]*Message, len(messages))
func toMessage(messages []*common.ReceivedMessage) []*types.Message {
msgs := make([]*types.Message, len(messages))
for i, msg := range messages {
msgs[i] = ToWakuMessage(msg)
}
Expand All @@ -457,7 +443,7 @@ func toMessage(messages []*common.ReceivedMessage) []*Message {

// GetFilterMessages returns the messages that match the filter criteria and
// are received between the last poll and now.
func (api *PublicWakuAPI) GetFilterMessages(id string) ([]*Message, error) {
func (api *PublicWakuAPI) GetFilterMessages(id string) ([]*types.Message, error) {
logger := api.w.logger.With(zap.String("site", "getFilterMessages"), zap.String("filterId", id))
api.mu.Lock()
f := api.w.GetFilter(id)
Expand All @@ -469,7 +455,7 @@ func (api *PublicWakuAPI) GetFilterMessages(id string) ([]*Message, error) {
api.mu.Unlock()

receivedMessages := f.Retrieve()
messages := make([]*Message, 0, len(receivedMessages))
messages := make([]*types.Message, 0, len(receivedMessages))
for _, msg := range receivedMessages {

logger.Debug("retrieved filter message", zap.String("hash", msg.EnvelopeHash.String()), zap.Bool("isP2P", msg.P2P), zap.String("topic", msg.Topic.String()))
Expand Down
36 changes: 12 additions & 24 deletions wakuv2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,27 +361,15 @@ func (api *PublicWakuAPI) Messages(ctx context.Context, crit types.Criteria) (*r
return rpcSub, nil
}

// Message is the RPC representation of a waku message.
type Message struct {
Sig []byte `json:"sig,omitempty"`
Timestamp uint32 `json:"timestamp"`
PubsubTopic string `json:"pubsubTopic"`
ContentTopic common.TopicType `json:"topic"`
Payload []byte `json:"payload"`
Padding []byte `json:"padding"`
Hash []byte `json:"hash"`
Dst []byte `json:"recipientPublicKey,omitempty"`
}

// ToWakuMessage converts an internal message into an API version.
func ToWakuMessage(message *common.ReceivedMessage) *Message {
msg := Message{
Payload: message.Data,
Padding: message.Padding,
Timestamp: message.Sent,
Hash: message.Hash().Bytes(),
PubsubTopic: message.PubsubTopic,
ContentTopic: message.ContentTopic,
func ToWakuMessage(message *common.ReceivedMessage) *types.Message {
msg := types.Message{
Payload: message.Data,
Padding: message.Padding,
Timestamp: message.Sent,
Hash: message.Hash().Bytes(),
PubsubTopic: message.PubsubTopic,
Topic: types.TopicType(message.ContentTopic),
}

if message.Dst != nil {
Expand All @@ -402,8 +390,8 @@ func ToWakuMessage(message *common.ReceivedMessage) *Message {
}

// toMessage converts a set of messages to its RPC representation.
func toMessage(messages []*common.ReceivedMessage) []*Message {
msgs := make([]*Message, len(messages))
func toMessage(messages []*common.ReceivedMessage) []*types.Message {
msgs := make([]*types.Message, len(messages))
for i, msg := range messages {
msgs[i] = ToWakuMessage(msg)
}
Expand All @@ -412,7 +400,7 @@ func toMessage(messages []*common.ReceivedMessage) []*Message {

// GetFilterMessages returns the messages that match the filter criteria and
// are received between the last poll and now.
func (api *PublicWakuAPI) GetFilterMessages(id string) ([]*Message, error) {
func (api *PublicWakuAPI) GetFilterMessages(id string) ([]*types.Message, error) {
api.mu.Lock()
f := api.w.GetFilter(id)
if f == nil {
Expand All @@ -423,7 +411,7 @@ func (api *PublicWakuAPI) GetFilterMessages(id string) ([]*Message, error) {
api.mu.Unlock()

receivedMessages := f.Retrieve()
messages := make([]*Message, 0, len(receivedMessages))
messages := make([]*types.Message, 0, len(receivedMessages))
for _, msg := range receivedMessages {
messages = append(messages, ToWakuMessage(msg))
}
Expand Down

0 comments on commit b2bb680

Please sign in to comment.