Skip to content

Commit

Permalink
Merge pull request #11 from biohackerellie/dev
Browse files Browse the repository at this point in the history
feat: Structured logging added
  • Loading branch information
biohackerellie authored Jan 28, 2025
2 parents 2108415 + 2bc4fb1 commit 8a35830
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 93 deletions.
6 changes: 6 additions & 0 deletions .changeset/polite-goats-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"pushpop": major
"@epklabs/pushpop": major
---

Updated packages and added structured logging to go package
36 changes: 36 additions & 0 deletions .github/renovate.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": [
"config:recommended"
],
"labels": [
"dependencies"
],
"autoApprove": true,
"schedule": [
"after 10pm",
"before 5am"
],
"baseBranches": [
"main"
],
"gomod": {
"fileMatch": [
"(^|/)go\\.mod$"
],
"pinDigests": false,
"postUpdateOptions": [
"gomodTidy"
]
},
"updateInternalDeps": true,
"rangeStrategy": "bump",
"automerge": true,
"npm": {
"fileMatch": [
"(^|/)package\\.json$",
"(^|/)package\\.json\\.hbs$",
"pnpm-workspace\\.yaml$"
]
}
}
47 changes: 24 additions & 23 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pushpop

import (
"encoding/json"
"log"
"net/http"
"strings"
"sync"
Expand All @@ -17,6 +16,7 @@ type Client struct {
hub *Hub
conn *websocket.Conn
send chan Message
log Logger
}

// Constants for WebSocket timeouts.
Expand All @@ -41,16 +41,16 @@ func ServeWs(hub *Hub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("Failed to upgrade connection", "err", err)
hub.log.Error("Failed to upgrade connection", "err", err)
return
}
conn.SetReadLimit(maxMessageSize)
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Print("Error setting read deadline", "err", err)
hub.log.Error("Error setting read deadline", "err", err)
}
conn.SetPongHandler(func(string) error {
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Print("Error setting read deadline", "err", err)
hub.log.Error("Error setting read deadline", "err", err)
}
return nil
})
Expand All @@ -60,6 +60,7 @@ func ServeWs(hub *Hub) http.HandlerFunc {
conn: conn,
send: make(chan Message, 256),
channels: sync.Map{},
log: hub.log,
}

hub.clients.Store(client, true)
Expand All @@ -80,24 +81,24 @@ func (c *Client) readPump() {
messageType, rawMessage, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
log.Printf("WebSocket closed by client: %v", c.conn.RemoteAddr())
c.log.Info("WebSocket closed by client", "addr", c.conn.RemoteAddr())
} else if strings.Contains(err.Error(), "connection reset by peer") {
log.Printf("Connection reset by peer for client: %v. Cleaning up.", c.conn.RemoteAddr())
c.log.Warn("Connection reset by peer for client. Cleaning up.", "client", c.conn.RemoteAddr())
} else {
log.Printf("Error reading message from client: %v, err: %v", c.conn.RemoteAddr(), err)
c.log.Warn("Error reading message from client", "client", c.conn.RemoteAddr(), "err", err)
}
return
}

if messageType != websocket.TextMessage {
log.Printf("Unsupported message type from client: %v, type: %d", c.conn.RemoteAddr(), messageType)
c.log.Warn("Unsupported message type from client", "client", c.conn.RemoteAddr(), "message", messageType)
continue
}

// Parse the message as JSON
var message map[string]interface{}
if err := json.Unmarshal(rawMessage, &message); err != nil {
log.Printf("Invalid JSON from client: %v, message: %s, err: %v", c.conn.RemoteAddr(), string(rawMessage), err)
c.log.Warn("Invalid JSON from client", "client", c.conn.RemoteAddr(), "message", string(rawMessage), "err", err)
continue
}

Expand All @@ -109,27 +110,27 @@ func (c *Client) readPump() {
case "ping":
// Respond to client heartbeat
if err := c.conn.WriteMessage(websocket.TextMessage, []byte(`{"action":"pong"}`)); err != nil {
log.Printf("Error sending pong to client: %v, err: %v", c.conn.RemoteAddr(), err)
c.log.Error("Error sending pong to client", "client", c.conn.RemoteAddr(), "err", err)
return
}
case "subscribe":
if channel == "" {
log.Printf("Client %v attempted to subscribe without specifying a channel.", c.conn.RemoteAddr())
c.log.Warn("Client attempted to subscribe without specifying a channel.", "client", c.conn.RemoteAddr())
continue
}
c.hub.register <- &Subscription{Client: c, Channel: channel}
log.Printf("Client %v subscribed to channel: %s", c.conn.RemoteAddr(), channel)
c.log.Debug("Client subscribed to channel", "client", c.conn.RemoteAddr(), "channel", channel)
case "unsubscribe":
if channel == "" {
log.Printf("Client %v attempted to unsubscribe without specifying a channel.", c.conn.RemoteAddr())
c.log.Warn("Client attempted to unsubscribe without specifying a channel.", "client", c.conn.RemoteAddr())
continue
}
c.hub.unregister <- &Subscription{Client: c, Channel: channel}
log.Printf("Client %v unsubscribed from channel: %s", c.conn.RemoteAddr(), channel)
c.log.Debug("Client unsubscribed from channel", "client", c.conn.RemoteAddr(), "channel", channel)
case "message":
payload := message["payload"]
if channel == "" {
log.Printf("Client %v attempted to send a message without specifying a channel.", c.conn.RemoteAddr())
c.log.Warn("Client attempted to send a message without specifying a channel.", "client", c.conn.RemoteAddr())
continue
}
msg := Message{
Expand All @@ -138,9 +139,9 @@ func (c *Client) readPump() {
Payload: payload,
}
c.hub.broadcast <- msg
log.Printf("Client %v sent a message to channel: %s", c.conn.RemoteAddr(), channel)
c.log.Debug("Client sent a message to channel", "client", c.conn.RemoteAddr(), "channel", channel)
default:
log.Printf("Unhandled action: %s from client: %v", action, c.conn.RemoteAddr())
c.log.Error("Unhandled action from client", "action", action, "client", c.conn.RemoteAddr())
}
}
}
Expand All @@ -157,7 +158,7 @@ func (c *Client) writePump() {
select {
case message, ok := <-c.send:
if err := c.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Print("Error setting write deadline", "err", err)
c.log.Error("Error setting write deadline", "err", err)
}
if !ok {
// The hub closed the channel.
Expand All @@ -167,21 +168,21 @@ func (c *Client) writePump() {

if err := c.conn.WriteJSON(message); err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
log.Print("WebSocket closed by client")
c.log.Debug("WebSocket closed by client")
} else {
log.Print("Error writing JSON", "err", err)
c.log.Error("Error writing JSON", "err", err)
}
return
}
case <-ticker.C:
if err := c.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Print("Error setting write deadline", "err", err)
c.log.Error("Error setting write deadline", "err", err)
}
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
log.Print("WebSocket closed by client")
c.log.Debug("WebSocket closed by client")
} else {
log.Print("Error sending ping", "err", err)
c.log.Error("Error sending ping", "err", err)
}
return
}
Expand Down
24 changes: 17 additions & 7 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"context"
"log"
"log/slog"
"net/http"
"os"
"os/signal"
Expand All @@ -12,9 +12,18 @@ import (

func main() {

hub := p.NewHub()
go hub.Run()
var levelVar slog.LevelVar
level, _ := os.LookupEnv("LOG_LEVEL")
if err := levelVar.UnmarshalText([]byte(level)); err != nil {
levelVar.Set(slog.LevelError)
}
logHandler := slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: &levelVar,
})
log := slog.New(logHandler)

hub := p.NewHub(log)
go hub.Run()
// Register routes
http.HandleFunc("/trigger", p.HandleTrigger(hub))
http.HandleFunc("/ws", p.ServeWs(hub))
Expand All @@ -29,18 +38,19 @@ func main() {

go func() {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal("Server error", "err", err)
log.Error("Server error", "err", err)
panic(err)
}
}()
// Wait for shutdown signal
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
<-signalChan
log.Print("Shutting down server...")
log.Info("Shutting down server...")

// Stop accepting new requests and clean up
if err := server.Shutdown(shutdownCtx); err != nil {
log.Fatal("Server shutdown failed", "err", err)
log.Error("Server shutdown failed", "err", err)
}
log.Print("Server gracefully stopped")
log.Info("Server gracefully stopped")
}
14 changes: 11 additions & 3 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pushpop
import (
"context"
"encoding/json"
"log"
"net/http"
"sync"
"time"
Expand All @@ -29,16 +28,25 @@ type Hub struct {
register chan *Subscription
unregister chan *Subscription
channels sync.Map
log Logger
}

type Logger interface {
Debug(msg string, args ...any)
Info(msg string, args ...any)
Warn(msg string, args ...any)
Error(msg string, args ...any)
}

// NewHub creates a new Hub.
func NewHub() *Hub {
func NewHub(log Logger) *Hub {
return &Hub{
broadcast: make(chan Message, 100),
register: make(chan *Subscription, 100),
unregister: make(chan *Subscription, 100),
channels: sync.Map{},
clients: sync.Map{},
log: log,
}
}

Expand Down Expand Up @@ -116,7 +124,7 @@ func HandleTrigger(hub *Hub) http.HandlerFunc {

var message Message
if err := json.NewDecoder(r.Body).Decode(&message); err != nil {
log.Print("error decoding message", err)
hub.log.Error("error decoding message", "err", err)
http.Error(w, "Invalid Request Body", http.StatusBadRequest)
return
}
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
},
"dependencies": {
"@changesets/changelog-github": "^0.4.8",
"@changesets/cli": "^2.27.3"
"@changesets/cli": "^2.27.12"
},
"packageManager": "pnpm@9.14.4",
"devDependencies": {
"turbo": "^2.3.3"
"turbo": "^2.3.4"
}
}
Loading

0 comments on commit 8a35830

Please sign in to comment.