diff --git a/client.go b/client.go index dcfa6d1..2f6ce9e 100644 --- a/client.go +++ b/client.go @@ -77,47 +77,68 @@ func (c *Client) readPump() { for { messageType, rawMessage, err := c.conn.ReadMessage() if err != nil { - // Handle normal WebSocket close events if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { log.Printf("WebSocket closed by client: %v", c.conn.RemoteAddr()) } else if strings.Contains(err.Error(), "connection reset by peer") { - // Suppress log for expected errors log.Printf("Connection reset by peer for client: %v. Cleaning up.", c.conn.RemoteAddr()) } else { log.Printf("Error reading message from client: %v, err: %v", c.conn.RemoteAddr(), err) } - return // Exit on any read error + return } - // Handle valid messages - switch messageType { - case websocket.TextMessage: - // Process text messages - var message map[string]interface{} - if err := json.Unmarshal(rawMessage, &message); err != nil { - log.Printf("Invalid JSON from client: %v, message: %s, err: %v", c.conn.RemoteAddr(), string(rawMessage), err) - continue // Skip invalid JSON - } - action, _ := message["action"].(string) + if messageType != websocket.TextMessage { + log.Printf("Unsupported message type from client: %v, type: %d", c.conn.RemoteAddr(), messageType) + continue + } - switch action { - case "ping": - if err := c.conn.WriteMessage(websocket.TextMessage, []byte(`{"action":"pong"}`)); err != nil { - return - } - default: - log.Printf("Unhandled action: %s", action) - } + // Parse the message as JSON + var message map[string]interface{} + if err := json.Unmarshal(rawMessage, &message); err != nil { + log.Printf("Invalid JSON from client: %v, message: %s, err: %v", c.conn.RemoteAddr(), string(rawMessage), err) + continue + } + + // Extract the action and handle it + action, _ := message["action"].(string) + channel, _ := message["channel"].(string) - case websocket.PingMessage: - // Reply to WebSocket ping frames - if err := c.conn.WriteMessage(websocket.PongMessage, nil); err != nil { - log.Printf("Error responding to ping from client: %v, err: %v", c.conn.RemoteAddr(), err) + switch action { + case "ping": + // Respond to client heartbeat + if err := c.conn.WriteMessage(websocket.TextMessage, []byte(`{"action":"pong"}`)); err != nil { + log.Printf("Error sending pong to client: %v, err: %v", c.conn.RemoteAddr(), err) return } - + case "subscribe": + if channel == "" { + log.Printf("Client %v attempted to subscribe without specifying a channel.", c.conn.RemoteAddr()) + continue + } + c.hub.register <- &Subscription{Client: c, Channel: channel} + log.Printf("Client %v subscribed to channel: %s", c.conn.RemoteAddr(), channel) + case "unsubscribe": + if channel == "" { + log.Printf("Client %v attempted to unsubscribe without specifying a channel.", c.conn.RemoteAddr()) + continue + } + c.hub.unregister <- &Subscription{Client: c, Channel: channel} + log.Printf("Client %v unsubscribed from channel: %s", c.conn.RemoteAddr(), channel) + case "message": + payload := message["payload"] + if channel == "" { + log.Printf("Client %v attempted to send a message without specifying a channel.", c.conn.RemoteAddr()) + continue + } + msg := Message{ + Channel: channel, + Event: "message", + Payload: payload, + } + c.hub.broadcast <- msg + log.Printf("Client %v sent a message to channel: %s", c.conn.RemoteAddr(), channel) default: - log.Printf("Unsupported message type from client: %v, type: %d", c.conn.RemoteAddr(), messageType) + log.Printf("Unhandled action: %s from client: %v", action, c.conn.RemoteAddr()) } } }