diff --git a/cmd/main.go b/cmd/main.go index c5f7ab5..db71d96 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,6 +2,8 @@ package main import "github.com/dezh-tech/immortal/relay" +// TODO::: create a full functioning CLI to manage rely. + func main() { s := relay.NewRelay() err := s.Start() diff --git a/relay/relay.go b/relay/relay.go index 101fb20..eaf7673 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -14,7 +14,9 @@ import ( ) // TODO::: replace with https://github.com/coder/websocket. +// TODO::: replace `log` with main logger. +// Relay represents a nostr relay which keeps track of client connections and handle them. type Relay struct { conns map[*websocket.Conn]map[string]filter.Filters connsLock sync.RWMutex @@ -27,6 +29,7 @@ func NewRelay() *Relay { } } +// Start strats a new relay instance. func (r *Relay) Start() error { http.Handle("/ws", websocket.Handler(r.handleWS)) err := http.ListenAndServe(":3000", nil) //nolint @@ -34,8 +37,8 @@ func (r *Relay) Start() error { return err } +// handleWS is WebSocket handler. func (r *Relay) handleWS(ws *websocket.Conn) { - // TODO::: replace with logger. log.Printf("new connection: %s\n", ws.RemoteAddr()) r.connsLock.Lock() @@ -45,6 +48,7 @@ func (r *Relay) handleWS(ws *websocket.Conn) { r.readLoop(ws) } +// readLoop reads incoming messages from a client and answer to them. func (r *Relay) readLoop(ws *websocket.Conn) { buf := make([]byte, 1024) for { @@ -54,7 +58,6 @@ func (r *Relay) readLoop(ws *websocket.Conn) { break } - // TODO::: replace with logger. log.Printf("error in connection handling: %s\n", err) continue @@ -67,29 +70,30 @@ func (r *Relay) readLoop(ws *websocket.Conn) { continue } - // TODO::: replace with logger. log.Printf("received envelope: %s\n", msg.String()) switch msg.Type() { case "REQ": - go r.HandleReq(ws, msg) + go r.handleReq(ws, msg) case "EVENT": - go r.HandleEvent(ws, msg) + go r.handleEvent(ws, msg) case "CLOSE": - go r.HandleClose(ws, msg) + r.handleClose(ws, msg) } } } -func (r *Relay) HandleReq(ws *websocket.Conn, m message.Message) { +// handleReq handles new incoming REQ messages from client. +func (r *Relay) handleReq(ws *websocket.Conn, m message.Message) { // TODO::: loadfrom database and sent in first query based on limit. // TODO::: return EOSE. + // TODO::: return EVENT messages. msg, ok := m.(*message.Req) if !ok { - _, _ = ws.Write(message.MakeNotice("error: can't parse REQ message")) + _, _ = ws.Write(message.MakeNotice("error: can't parse REQ message.")) return } @@ -99,25 +103,22 @@ func (r *Relay) HandleReq(ws *websocket.Conn, m message.Message) { subs, ok := r.conns[ws] if !ok { - _, _ = ws.Write(message.MakeNotice(fmt.Sprintf("error: can't find connection %s", + _, _ = ws.Write(message.MakeNotice(fmt.Sprintf("error: can't find connection %s.", ws.RemoteAddr()))) return } subs[msg.SubscriptionID] = msg.Filters - - // TODO::: return EVENT messages. } -func (r *Relay) HandleEvent(ws *websocket.Conn, m message.Message) { - // TODO::: send events to be stored and proccessed. - +// handleEvent handles new incoming EVENT messages from client. +func (r *Relay) handleEvent(ws *websocket.Conn, m message.Message) { msg, ok := m.(*message.Event) if !ok { okm := message.MakeOK(false, "", - "error: can't parse the message.", // TODO::: make an error builder. + "error: can't parse EVENT message.", ) _, _ = ws.Write(okm) @@ -128,7 +129,7 @@ func (r *Relay) HandleEvent(ws *websocket.Conn, m message.Message) { if !msg.Event.IsValid() { okm := message.MakeOK(false, msg.SubscriptionID, - "invalid: invalid id or sig.", // TODO::: make an error builder. + "invalid: id or sig is not correct.", ) _, _ = ws.Write(okm) @@ -140,21 +141,19 @@ func (r *Relay) HandleEvent(ws *websocket.Conn, m message.Message) { for conn, subs := range r.conns { for id, filters := range subs { - // is this concurrent safe? - go func(conn *websocket.Conn, id string, filters filter.Filters) { - if !filters.Match(msg.Event) { - return - } - _, _ = conn.Write(message.MakeEvent(id, msg.Event)) - }(conn, id, filters) + if !filters.Match(msg.Event) { + return + } + _, _ = conn.Write(message.MakeEvent(id, msg.Event)) } } } -func (r *Relay) HandleClose(ws *websocket.Conn, m message.Message) { +// handleClose handles new incoming CLOSE messages from client. +func (r *Relay) handleClose(ws *websocket.Conn, m message.Message) { msg, ok := m.(*message.Close) if !ok { - _, _ = ws.Write(message.MakeNotice("error: can't parse CLOSE message")) + _, _ = ws.Write(message.MakeNotice("error: can't parse CLOSE message.")) return } @@ -164,14 +163,14 @@ func (r *Relay) HandleClose(ws *websocket.Conn, m message.Message) { conn, ok := r.conns[ws] if !ok { - _, _ = ws.Write(message.MakeNotice(fmt.Sprintf("error: can't find connection %s", + _, _ = ws.Write(message.MakeNotice(fmt.Sprintf("error: can't find connection %s.", ws.RemoteAddr()))) return } delete(conn, msg.String()) - _, _ = ws.Write(message.MakeClosed(msg.String(), "ok: closed successfully")) + _, _ = ws.Write(message.MakeClosed(msg.String(), "ok: closed successfully.")) } // Stop shutdowns the relay gracefully. @@ -180,9 +179,12 @@ func (r *Relay) Stop() error { defer r.connsLock.Unlock() for wsConn, subs := range r.conns { + // close all subscriptions. for id := range subs { - _, _ = wsConn.Write(message.MakeClosed(id, "relay is stopping.")) + _, _ = wsConn.Write(message.MakeClosed(id, "error: shutdowning the relay.")) } + + // close connection. _ = wsConn.Close() } diff --git a/tmp/client_recvr.js b/tmp/client_recvr.js deleted file mode 100644 index bd37844..0000000 --- a/tmp/client_recvr.js +++ /dev/null @@ -1,14 +0,0 @@ -const req = `["REQ","nak",{"ids":["cbba15aff4ed4db6370834c9370436ba20615ffa2d170515058f11e522c8dc02"]}]`; -const close = `["CLOSE", "nak"]`; - -let ws = new WebSocket("ws://localhost:3000/ws"); - -ws.onmessage = (e) => { - console.log(e.data); -}; - -ws.send( - `["REQ","nak",{"ids":["cbba15aff4ed4db6370834c9370436ba20615ffa2d170515058f11e522c8dc02"]}]`, -); - -// ws.send(close); diff --git a/tmp/client_sendr.js b/tmp/client_sendr.js deleted file mode 100644 index d9b5e1e..0000000 --- a/tmp/client_sendr.js +++ /dev/null @@ -1,9 +0,0 @@ -let ws = new WebSocket("ws://localhost:3000/ws"); - -ws.onmessage = (e) => { - console.log(e.data); -}; - -ws.send( - `["EVENT",{"kind":1,"id":"cbba15aff4ed4db6370834c9370436ba20615ffa2d170515058f11e522c8dc02","pubkey":"79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798","created_at":1726056813,"tags":[],"content":"test","sig":"282db08a865b5fe97d2351a3e7321ee279b9e6bd16e4e8d9f746c4ea148337e01bae7bc10f7465bdffe740bf6682d7aaadd4777919891ba845e66bf52ee7b6f8"}]`, -);