From 120bfe52b4d0404084ea9bcd8585709825899098 Mon Sep 17 00:00:00 2001 From: "k." Date: Mon, 14 Oct 2024 18:43:17 +0000 Subject: [PATCH] fix(handler,sever): fixing wrong query and event handle, wrong metrics for subs. (#69) --- config/config.yml | 15 ++++++++++----- config/config_test.go | 2 +- config/parameters.go | 2 +- handler/event.go | 7 +------ handler/handler.go | 37 +++++++++++++++++++++++++++++++------ handler/req.go | 25 ++++++++++++++++++++----- makefile | 5 ----- server/http/health.go | 20 -------------------- server/http/server.go | 2 +- server/websocket/server.go | 12 +++++++++--- types/kind.go | 10 +++++----- version.go | 4 ++-- 12 files changed, 81 insertions(+), 60 deletions(-) diff --git a/config/config.yml b/config/config.yml index 8704fab..f3eb4b5 100644 --- a/config/config.yml +++ b/config/config.yml @@ -1,9 +1,7 @@ # This is default immortal config file. # This config file contains essential information which is needed for bootstrapping. -# The rest of configs such as limitations or NIP-11 profile must be changed on database config table, -# which is defined and documented on documents/config.md. +# The rest of configs such as limitations or NIP-11 profile must be changed on database config table. -# todo(@ZigBalthazar): move development notes to documents directory and define development docker files. # developers note: for development, set environment to `"dev"` and make a config.yml and .env beside your build. # environment determines where and how to read secrets. (dev/prod) @@ -15,12 +13,15 @@ ws_server: # bind is the IP address to be bind and listen on. # default if local host. bind: "0.0.0.0" + # port is websocket port to be listen on. # default is 7777. - port: 9090 + port: 7777 + # known_bloom_size is the size of bloom filter to check and avoid trying to store/broadcast existing events. # default is 1M events. known_bloom_size: 1000000 + # bloom_backup_path is the path to store bloom filter when relay shutdown. # default is immo_bloom_backup. bloom_backup_path: "immo_bloom_backup" @@ -30,17 +31,21 @@ http_server: # bind is the IP address to be bind and listen on. # default if local host. bind: "0.0.0.0" + # port is websocket port to be listen on. # default is 8888. - port: 8080 + port: 8888 # database contains details of database connections and limitations. database: # db_name is the name of mongodb related to immortal + # default is immortal. db_name: "immortal" + # query_timeout_in_ms specifies the maximum duration (in milliseconds) for query execution before timing out. # default is 3000. query_timeout_in_ms: 3000 + # connection_timeout_in_ms specifies the maximum duration (in milliseconds) that is used for creating connections to the server. # default is 5000. connection_timeout_in_ms: 5000 diff --git a/config/config_test.go b/config/config_test.go index bb30aac..6deac05 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -12,6 +12,6 @@ func TestLoadfromFile(t *testing.T) { cfg, err := config.Load("./config.yml") require.NoError(t, err, "error must be nil.") - assert.Equal(t, uint16(9090), cfg.WebsocketServer.Port) + assert.Equal(t, uint16(7777), cfg.WebsocketServer.Port) assert.Equal(t, "0.0.0.0", cfg.WebsocketServer.Bind) } diff --git a/config/parameters.go b/config/parameters.go index c2c6589..c39b1b2 100644 --- a/config/parameters.go +++ b/config/parameters.go @@ -96,7 +96,7 @@ func (c *Config) LoadParameters(db *database.Database) error { AuthRequired: false, // Whether authentication is required for writes PaymentRequired: false, // Whether payment is required to interact with the relay RestrictedWrites: false, // Whether writes are restricted to authenticated or paying users - MaxEventTags: 200, // Maximum number of tags allowed in a single event + MaxEventTags: 1000, // Maximum number of tags allowed in a single event MaxContentLength: 4096, // Maximum content length of an event (in bytes) CreatedAtLowerLimit: 0, // Earliest timestamp allowed for event creation CreatedAtUpperLimit: 0, // Latest timestamp allowed for event creation (0 for no limit) diff --git a/handler/event.go b/handler/event.go index 3bb31a8..a9bd954 100644 --- a/handler/event.go +++ b/handler/event.go @@ -3,7 +3,6 @@ package handler import ( "context" "errors" - "fmt" "github.com/dezh-tech/immortal/types/event" "go.mongodb.org/mongo-driver/bson" @@ -11,12 +10,8 @@ import ( ) func (h *Handler) HandleEvent(e *event.Event) error { - collName, ok := KindToCollectionName[e.Kind] - if !ok { - return fmt.Errorf("kind %d is not supported yet", e.Kind) - } + coll := h.db.Client.Database(h.db.DBName).Collection(getCollectionName(e.Kind)) - coll := h.db.Client.Database(h.db.DBName).Collection(collName) ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout) defer cancel() diff --git a/handler/handler.go b/handler/handler.go index 26b5f26..27202b7 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -39,7 +39,7 @@ var KindToCollectionName = map[types.Kind]string{ types.KindPatches: "patches", types.KindIssues: "issues", types.KindReplies: "replies", - types.KindStatus: "status_updates", + types.KindStatus: "status", types.KindProblemTracker: "problem_trackers", types.KindReporting: "reportings", types.KindLabel: "labels", @@ -49,10 +49,10 @@ var KindToCollectionName = map[types.Kind]string{ types.KindTorrentComment: "torrent_comments", types.KindCoinJoinPool: "coin_join_pools", types.KindCommunityPostApproval: "community_post_approvals", - types.KindJobRequest: "job_requests", - types.KindJobResult: "job_results", - types.KindJobFeedback: "job_feedbacks", - types.KindGroupControlEvents: "group_control_events", + types.KindJobRequest: "dvm", + types.KindJobResult: "dvm", + types.KindJobFeedback: "dvm", + types.KindGroups: "groups", types.KindZapGoal: "zap_goals", types.KindTidalLogin: "tidal_logins", types.KindZapRequest: "zap_requests", @@ -93,7 +93,32 @@ var KindToCollectionName = map[types.Kind]string{ types.KindShortFormPortraitVideoEvent: "short_form_portrait_video_events", types.KindVideoViewEvent: "video_view_events", types.KindCommunityDefinition: "community_definitions", - types.KindGroupMetadataEvents: "group_metadata_events", + types.KindGroupsMetadata: "groups_metadata", +} + +func getCollectionName(k types.Kind) string { + collName, ok := KindToCollectionName[k] + if ok { + return collName + } + + if k >= 9000 && k <= 9030 { + return "groups" + } + + if k >= 1630 && k <= 1633 { + return "status" + } + + if k >= 39000 && k <= 39009 { + return "groups_metadata" + } + + if k >= 5000 && k <= 5999 || k >= 6000 && k <= 6999 || k == 7000 { + return "dvm" + } + + return "unknown" } type Handler struct { diff --git a/handler/req.go b/handler/req.go index 35cb1d7..da3c58d 100644 --- a/handler/req.go +++ b/handler/req.go @@ -10,6 +10,13 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" ) +var possibleKinds = []types.Kind{ + types.KindUserMetadata, + types.KindShortTextNote, + types.KindZap, + types.KindRelayListMetadata, +} + type filterQuery struct { Tags map[string][]string @@ -37,16 +44,24 @@ func (h *Handler) HandleReq(fs filter.Filters) ([]event.Event, error) { Limit: f.Limit, } - uniqueKinds := removeDuplicateKind(f.Kinds) - for _, k := range uniqueKinds { - queryKinds[k] = append(queryKinds[k], qf) + if len(f.Kinds) != 0 { + uniqueKinds := removeDuplicateKind(f.Kinds) + for _, k := range uniqueKinds { + queryKinds[k] = append(queryKinds[k], qf) + } + } else { + //! we query most requested kinds if there is no kind provided. + // FIX: any better way? + for _, k := range possibleKinds { + queryKinds[k] = append(queryKinds[k], qf) + } } } var finalResult []event.Event for kind, filters := range queryKinds { - collection := h.db.Client.Database(h.db.DBName).Collection(KindToCollectionName[kind]) + collection := h.db.Client.Database(h.db.DBName).Collection(getCollectionName(kind)) for _, f := range filters { query, opts, err := h.FilterToQuery(&f) if err != nil { @@ -84,7 +99,7 @@ func removeDuplicateKind(intSlice []types.Kind) []types.Kind { } func (h *Handler) FilterToQuery(fq *filterQuery) (bson.D, *options.FindOptions, error) { - var query bson.D + query := make(bson.D, 0) opts := options.Find() // Filter by IDs diff --git a/makefile b/makefile index ac7dfe0..14231a9 100644 --- a/makefile +++ b/makefile @@ -7,7 +7,6 @@ devtools: go install mvdan.cc/gofumpt@latest go install github.com/volatiletech/sqlboiler/v4@latest go install github.com/volatiletech/sqlboiler/v4/drivers/sqlboiler-psql@latest - # TODO ::: go-migrate ### Testing unit-test: @@ -42,8 +41,4 @@ compose-up: compose-down: docker-compose down -### sqlBoiler - models-generate: - sqlboiler psql - .PHONY: build diff --git a/server/http/health.go b/server/http/health.go index bc1f7c8..38fe4c8 100644 --- a/server/http/health.go +++ b/server/http/health.go @@ -3,7 +3,6 @@ package http import ( "context" "net/http" - "runtime" ) type status string @@ -15,14 +14,6 @@ const ( statusTimeout status = "Timeout during health check" ) -type system struct { - Version string `json:"version"` - GoroutinesCount int `json:"goroutines_count"` - TotalAllocBytes uint64 `json:"total_alloc_bytes"` - HeapObjectsCount uint64 `json:"heap_objects_count"` - AllocBytes uint64 `json:"alloc_bytes"` -} - type service struct { Name string `json:"name"` Status bool `json:"status"` @@ -32,22 +23,11 @@ type service struct { type healthResponse struct { Status status `json:"status"` Database service `json:"databse"` - System system `json:"system"` } func (s *Server) healthHandler(w http.ResponseWriter, _ *http.Request) { - ms := runtime.MemStats{} - runtime.ReadMemStats(&ms) - resp := healthResponse{ Status: statusOK, - System: system{ - Version: runtime.Version(), - GoroutinesCount: runtime.NumGoroutine(), - TotalAllocBytes: ms.Alloc, - HeapObjectsCount: ms.HeapObjects, - AllocBytes: ms.Alloc, - }, Database: service{ Name: "mongo_db", }, diff --git a/server/http/server.go b/server/http/server.go index d51f28b..89fcab6 100644 --- a/server/http/server.go +++ b/server/http/server.go @@ -26,7 +26,7 @@ func New(cfg Config, db *database.Database) (*Server, error) { } r.HandleFunc("/health", s.healthHandler).Methods("GET") - r.Handle("/metrics", promhttp.Handler()) + r.Handle("/metrics", promhttp.Handler()).Methods("GET") return s, nil } diff --git a/server/websocket/server.go b/server/websocket/server.go index 9ca4d34..d6a8e3f 100644 --- a/server/websocket/server.go +++ b/server/websocket/server.go @@ -111,6 +111,11 @@ func (s *Server) readLoop(conn *websocket.Conn) { s.metrics.Connections.Dec() + client, ok := s.conns[conn] + if ok { + s.metrics.Subscriptions.Sub(float64(len(client.subs))) + } + delete(s.conns, conn) s.mu.Unlock() @@ -160,7 +165,7 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) { return } - if len(msg.Filters) > s.config.Limitation.MaxFilters { + if len(msg.Filters) >= s.config.Limitation.MaxFilters { _ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: max limit of filters is: %d", s.config.Limitation.MaxFilters))) @@ -169,7 +174,7 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) { return } - if s.config.Limitation.MaxSubidLength < len(msg.SubscriptionID) { + if s.config.Limitation.MaxSubidLength <= len(msg.SubscriptionID) { _ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: max limit of sub id is: %d", s.config.Limitation.MaxSubidLength))) @@ -188,7 +193,7 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) { return } - if len(client.subs) > s.config.Limitation.MaxSubscriptions { + if len(client.subs) >= s.config.Limitation.MaxSubscriptions { _ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: max limit of subs is: %d", s.config.Limitation.MaxSubscriptions))) @@ -199,6 +204,7 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) { res, err := s.handlers.HandleReq(msg.Filters) if err != nil { + log.Println(err.Error()) _ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: can't process REQ message: %s", err.Error()))) status = databaseFail diff --git a/types/kind.go b/types/kind.go index d1a1e17..4ada776 100644 --- a/types/kind.go +++ b/types/kind.go @@ -46,7 +46,7 @@ const ( KindPatches Kind = 1617 KindIssues Kind = 1621 KindReplies Kind = 1622 - KindStatus Kind = 1630 // to 1633. support: todo. + KindStatus Kind = 1630 KindProblemTracker Kind = 1971 KindReporting Kind = 1984 KindLabel Kind = 1985 @@ -56,10 +56,10 @@ const ( KindTorrentComment Kind = 2004 KindCoinJoinPool Kind = 2022 KindCommunityPostApproval Kind = 4550 - KindJobRequest Kind = 5000 // to 5999. support: todo. - KindJobResult Kind = 6000 // to 6999. support: todo. + KindJobRequest Kind = 5000 + KindJobResult Kind = 6000 KindJobFeedback Kind = 7000 - KindGroupControlEvents Kind = 9000 // to 9030. support: todo. + KindGroups Kind = 9000 KindZapGoal Kind = 9041 KindTidalLogin Kind = 9467 KindZapRequest Kind = 9734 @@ -127,7 +127,7 @@ const ( KindShortFormPortraitVideoEvent Kind = 34236 KindVideoViewEvent Kind = 34237 KindCommunityDefinition Kind = 34550 - KindGroupMetadataEvents Kind = 39000 // to 39009. support: todo. + KindGroupsMetadata Kind = 39000 ) // IsRegular checks if the given kind is in Regular range. diff --git a/version.go b/version.go index 1769683..4eac770 100644 --- a/version.go +++ b/version.go @@ -7,8 +7,8 @@ import "fmt" var ( major = 0 minor = 0 - patch = 1 - meta = "" + patch = 3 + meta = "beta" ) func StringVersion() string {