Skip to content

Commit

Permalink
finish, last test: pass.
Browse files Browse the repository at this point in the history
  • Loading branch information
kehiy committed Sep 11, 2024
1 parent fd94c4c commit c272d43
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 51 deletions.
2 changes: 2 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import "github.com/dezh-tech/immortal/relay"

// TODO::: create a full functioning CLI to manage rely.

func main() {
s := relay.NewRelay()
err := s.Start()
Expand Down
58 changes: 30 additions & 28 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
)

// TODO::: replace with https://github.com/coder/websocket.
// TODO::: replace `log` with main logger.

// Relay represents a nostr relay which keeps track of client connections and handle them.
type Relay struct {
conns map[*websocket.Conn]map[string]filter.Filters
connsLock sync.RWMutex
Expand All @@ -27,15 +29,16 @@ func NewRelay() *Relay {
}
}

// Start strats a new relay instance.
func (r *Relay) Start() error {
http.Handle("/ws", websocket.Handler(r.handleWS))
err := http.ListenAndServe(":3000", nil) //nolint

return err
}

// handleWS is WebSocket handler.
func (r *Relay) handleWS(ws *websocket.Conn) {
// TODO::: replace with logger.
log.Printf("new connection: %s\n", ws.RemoteAddr())

r.connsLock.Lock()
Expand All @@ -45,6 +48,7 @@ func (r *Relay) handleWS(ws *websocket.Conn) {
r.readLoop(ws)
}

// readLoop reads incoming messages from a client and answer to them.
func (r *Relay) readLoop(ws *websocket.Conn) {
buf := make([]byte, 1024)
for {
Expand All @@ -54,7 +58,6 @@ func (r *Relay) readLoop(ws *websocket.Conn) {
break
}

// TODO::: replace with logger.
log.Printf("error in connection handling: %s\n", err)

continue
Expand All @@ -67,29 +70,30 @@ func (r *Relay) readLoop(ws *websocket.Conn) {
continue
}

// TODO::: replace with logger.
log.Printf("received envelope: %s\n", msg.String())

switch msg.Type() {
case "REQ":
go r.HandleReq(ws, msg)
go r.handleReq(ws, msg)

case "EVENT":
go r.HandleEvent(ws, msg)
go r.handleEvent(ws, msg)

case "CLOSE":
go r.HandleClose(ws, msg)
r.handleClose(ws, msg)
}
}
}

func (r *Relay) HandleReq(ws *websocket.Conn, m message.Message) {
// handleReq handles new incoming REQ messages from client.
func (r *Relay) handleReq(ws *websocket.Conn, m message.Message) {
// TODO::: loadfrom database and sent in first query based on limit.
// TODO::: return EOSE.
// TODO::: return EVENT messages.

msg, ok := m.(*message.Req)
if !ok {
_, _ = ws.Write(message.MakeNotice("error: can't parse REQ message"))
_, _ = ws.Write(message.MakeNotice("error: can't parse REQ message."))

return
}
Expand All @@ -99,25 +103,22 @@ func (r *Relay) HandleReq(ws *websocket.Conn, m message.Message) {

subs, ok := r.conns[ws]
if !ok {
_, _ = ws.Write(message.MakeNotice(fmt.Sprintf("error: can't find connection %s",
_, _ = ws.Write(message.MakeNotice(fmt.Sprintf("error: can't find connection %s.",
ws.RemoteAddr())))

return
}

subs[msg.SubscriptionID] = msg.Filters

// TODO::: return EVENT messages.
}

func (r *Relay) HandleEvent(ws *websocket.Conn, m message.Message) {
// TODO::: send events to be stored and proccessed.

// handleEvent handles new incoming EVENT messages from client.
func (r *Relay) handleEvent(ws *websocket.Conn, m message.Message) {
msg, ok := m.(*message.Event)
if !ok {
okm := message.MakeOK(false,
"",
"error: can't parse the message.", // TODO::: make an error builder.
"error: can't parse EVENT message.",
)

_, _ = ws.Write(okm)
Expand All @@ -128,7 +129,7 @@ func (r *Relay) HandleEvent(ws *websocket.Conn, m message.Message) {
if !msg.Event.IsValid() {
okm := message.MakeOK(false,
msg.SubscriptionID,
"invalid: invalid id or sig.", // TODO::: make an error builder.
"invalid: id or sig is not correct.",
)

_, _ = ws.Write(okm)
Expand All @@ -140,21 +141,19 @@ func (r *Relay) HandleEvent(ws *websocket.Conn, m message.Message) {

for conn, subs := range r.conns {
for id, filters := range subs {
// is this concurrent safe?
go func(conn *websocket.Conn, id string, filters filter.Filters) {
if !filters.Match(msg.Event) {
return
}
_, _ = conn.Write(message.MakeEvent(id, msg.Event))
}(conn, id, filters)
if !filters.Match(msg.Event) {
return
}
_, _ = conn.Write(message.MakeEvent(id, msg.Event))
}
}
}

func (r *Relay) HandleClose(ws *websocket.Conn, m message.Message) {
// handleClose handles new incoming CLOSE messages from client.
func (r *Relay) handleClose(ws *websocket.Conn, m message.Message) {
msg, ok := m.(*message.Close)
if !ok {
_, _ = ws.Write(message.MakeNotice("error: can't parse CLOSE message"))
_, _ = ws.Write(message.MakeNotice("error: can't parse CLOSE message."))

return
}
Expand All @@ -164,14 +163,14 @@ func (r *Relay) HandleClose(ws *websocket.Conn, m message.Message) {

conn, ok := r.conns[ws]
if !ok {
_, _ = ws.Write(message.MakeNotice(fmt.Sprintf("error: can't find connection %s",
_, _ = ws.Write(message.MakeNotice(fmt.Sprintf("error: can't find connection %s.",
ws.RemoteAddr())))

return
}

delete(conn, msg.String())
_, _ = ws.Write(message.MakeClosed(msg.String(), "ok: closed successfully"))
_, _ = ws.Write(message.MakeClosed(msg.String(), "ok: closed successfully."))
}

// Stop shutdowns the relay gracefully.
Expand All @@ -180,9 +179,12 @@ func (r *Relay) Stop() error {
defer r.connsLock.Unlock()

for wsConn, subs := range r.conns {
// close all subscriptions.
for id := range subs {
_, _ = wsConn.Write(message.MakeClosed(id, "relay is stopping."))
_, _ = wsConn.Write(message.MakeClosed(id, "error: shutdowning the relay."))
}

// close connection.
_ = wsConn.Close()
}

Expand Down
14 changes: 0 additions & 14 deletions tmp/client_recvr.js

This file was deleted.

9 changes: 0 additions & 9 deletions tmp/client_sendr.js

This file was deleted.

0 comments on commit c272d43

Please sign in to comment.