Skip to content

Commit

Permalink
feat: supporting nip-09, accepting 1 filter per req message, using ag…
Browse files Browse the repository at this point in the history
…gregation pipeline on req.:
  • Loading branch information
kehiy committed Jan 12, 2025
1 parent 89255a1 commit 87747c6
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 16 deletions.
13 changes: 7 additions & 6 deletions delivery/websocket/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

// handleEvent handles new incoming EVENT messages from client.
func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) {
func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { //nolint
s.mu.Lock()
defer s.mu.Unlock()
defer measureLatency(s.metrics.EventLatency)()
Expand Down Expand Up @@ -176,12 +176,13 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) {

if !msg.Event.Kind.IsEphemeral() {
if msg.Event.Kind == types.KindEventDeletionRequest {
filterString := msg.Event.Tags.GetValue("filter")
filter, err := filter.Decode([]byte(filterString))
deleteFilterString := msg.Event.Tags.GetValue("filter")

deleteFilter, err := filter.Decode([]byte(deleteFilterString))
if err != nil {
okm := message.MakeOK(false,
msg.Event.ID,
fmt.Sprintf("error: parse deletion event: %s", filterString),
fmt.Sprintf("error: parse deletion event: %s", deleteFilterString),
)

_ = conn.WriteMessage(1, okm)
Expand All @@ -192,9 +193,9 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) {
}

// you can only delete events you own.
filter.Authors = []string{msg.Event.PublicKey}
deleteFilter.Authors = []string{msg.Event.PublicKey}

go s.handler.DeleteByFilter(filter)
go s.handler.DeleteByFilter(deleteFilter) //nolint
}

if err := s.handler.HandleEvent(msg.Event); err != nil {
Expand Down
21 changes: 13 additions & 8 deletions repository/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,20 @@ func (h *Handler) DeleteByFilter(f *filter.Filter) error {
// helps us ti prevent multiple database calls and it would help us to do the operation faster.
// to do the same thing for deletion we need to filter the documents with $match, then update the
// fields of deleted event to null (expect the `id` since its unique index to prevent overwrites) with $unset
// then we apply them to collection using $merge. although we can't use multiple $merge's on one pipeline and we must have
// then we apply them to collection using $merge.
// although we can't use multiple $merge's on one pipeline and we must have
// only one merge at the end of pipeline commands. also, $unionWith is restricted to be used with $merge.

// notes::: these details may help you to think for solutions better:
// 1. we create a collection for each kind or each group of kinds.
// using this model forces us to make query to all collections corresponding to provided kinds when
// we are dealing with filters since filters contain a list of kinds (which can be empty and we are then forced to query all collections)
// we are dealing with filters since filters contain a list of kinds
// (which can be empty and we are then forced to query all collections)

// 2. when we delete an event we $unset all fields expect `id`. when we make a query to read from database, we ignore fields which
// their fields are null. and when we write new events we prevent overwriting events with duplicated `id`. so we can handle the deletion properly.
// 2. when we delete an event we $unset all fields expect `id`.
// when we make a query to read from database, we ignore fields which
// their fields are null. and when we write new events we prevent overwriting
// events with duplicated `id`. so we can handle the deletion properly.

// resources::: these links may help you:
// 1. https://www.mongodb.com/docs/manual/reference/operator/aggregation/merge/#restrictions
Expand Down Expand Up @@ -92,13 +96,12 @@ func (h *Handler) DeleteByFilter(f *filter.Filter) error {
}},
}

for kind, filter := range queryKinds {
for kind, deleteFilter := range queryKinds {
collectionName, isMultiKindColl := getCollectionName(kind)

query := filterToMongoQuery(filter, isMultiKindColl, kind)
query := filterToMongoQuery(deleteFilter, isMultiKindColl, kind)

ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout)
defer cancel()

_, err := h.db.Client.Database(h.db.DBName).Collection(collectionName).UpdateMany(ctx, query, update)
if err != nil {
Expand All @@ -108,9 +111,11 @@ func (h *Handler) DeleteByFilter(f *filter.Filter) error {
logger.Error("can't send log to manager", "err", err)
}

cancel()

return err
}

cancel()
}

return nil
Expand Down
2 changes: 0 additions & 2 deletions repository/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func (h *Handler) HandleReq(f *filter.Filter) ([]event.Event, error) {
}

return nil, err

}
defer cursor.Close(ctx)

Expand All @@ -93,7 +92,6 @@ func (h *Handler) HandleReq(f *filter.Filter) ([]event.Event, error) {
}

return nil, err

}

return finalResult, nil
Expand Down

0 comments on commit 87747c6

Please sign in to comment.