Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added function related to the debugger #528

Merged
merged 19 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e750f30
modified functions to return EventList
Nov 13, 2023
dc88b2a
modified functions to return EventList
Nov 13, 2023
d88c2f1
Confine EventRecord usage to the recorder impl.
matejpavlovic Nov 28, 2023
ef1a743
Merge remote-tracking branch 'origin/visualization' into visualization
Nov 28, 2023
e40e168
Removed InterceptWithReturn and modified Intercept to now return an E…
Cath3876 Nov 28, 2023
e6fa802
changed return value to nil and feed output of one interceptor to inp…
Cath3876 Nov 30, 2023
211222d
Added functions for the debugger in debugger.go
Cath3876 Dec 7, 2023
a7e0190
changed main.go in pingpong to be able to use the debugger
Cath3876 Dec 7, 2023
0685c50
minor fixes to pass checks on pull request.
Cath3876 Dec 7, 2023
d6b7356
minor fixes to pass checks on pull request.
Cath3876 Dec 7, 2023
f3f04be
minor fixes to pass checks on pull request.
Cath3876 Dec 7, 2023
10e585d
minor fixes to pass checks on pull request.
Cath3876 Dec 7, 2023
83e0706
Merge branch 'visualization' into debugger
Cath3876 Jan 30, 2024
b0ee2f0
InterceptorInit() now always returns the interceptor,
Cath3876 Jan 30, 2024
5bdc11c
addressed failed checks
Cath3876 Jan 30, 2024
b369a27
Simplify interface-string conversions
matejpavlovic Feb 5, 2024
d193192
change InterceptorInit to NewWebSocketDebugger and update its descrip…
Cath3876 Feb 6, 2024
04b6a7c
Pass logger as parameter to the debugger creation
matejpavlovic Feb 6, 2024
12827ef
Clean up a bit and separate debugger in 2 files
matejpavlovic Feb 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions pkg/debugger/debugger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package debugger

import (
"fmt"

"github.com/filecoin-project/mir/pkg/eventlog"
"github.com/filecoin-project/mir/pkg/logging"
t "github.com/filecoin-project/mir/pkg/types"
)

// NewWebSocketDebugger initializes the interceptor for a given node and uses the given port for the WebSocket connection
func NewWebSocketDebugger(
ownID t.NodeID,
port string,
logger logging.Logger,
) (*eventlog.Recorder, error) {
// writerFactory creates and returns a WebSocket-based event writer
writerFactory := func(_ string, ownID t.NodeID, l logging.Logger) (eventlog.EventWriter, error) {
return newWSWriter(fmt.Sprintf(":%s", port), l), nil
}

var interceptor *eventlog.Recorder
var err error
interceptor, err = eventlog.NewRecorder(
ownID,
fmt.Sprintf("./node%s", ownID),
logger,
eventlog.EventWriterOpt(writerFactory),
eventlog.SyncWriteOpt(),
)
if err != nil {
return nil, err
}
return interceptor, err
}
168 changes: 168 additions & 0 deletions pkg/debugger/websocketwriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package debugger

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"time"

"github.com/gorilla/websocket"

"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
)

const (
ReadBufferSize = 1024
WriteBufferSize = 1024
)

type WSWriter struct {
// ... websocket server variables ...
conn *websocket.Conn
upgrader websocket.Upgrader
eventSignal chan map[string]string
WSMessage struct {
Type string `json:"Type"`
Value string `json:"Value"`
}
logger logging.Logger
}

// newWSWriter creates a new WSWriter that establishes a websocket connection
func newWSWriter(port string, logger logging.Logger) *WSWriter {

// Create a new WSWriter object
wsWriter := &WSWriter{
upgrader: websocket.Upgrader{
ReadBufferSize: ReadBufferSize,
WriteBufferSize: WriteBufferSize,
},
eventSignal: make(chan map[string]string),
logger: logger,
}

http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
wsWriter.upgrader.CheckOrigin = func(r *http.Request) bool { return true } // Allow opening the connection by HTML file
conn, err := wsWriter.upgrader.Upgrade(w, r, nil)
if err != nil {
panic(err)
}

wsWriter.conn = conn
defer func() {
err := wsWriter.Close()
if err != nil {
panic(err)
}
}() // Ensure the connection is closed when the function exits

for {
messageType, message, err := conn.ReadMessage()
if err != nil || messageType != websocket.TextMessage {
break
}

var signal map[string]string
err = json.Unmarshal(message, &signal)
if err != nil {
panic(err)
}

// Check if the signal is a 'close' command
if signal["Type"] == "close" && signal["Value"] == "" {
break
}

wsWriter.HandleClientSignal(signal)
}
})

// Create an Async go routine that waits for the connection
go func() {
server := &http.Server{
Addr: port,
Handler: nil,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 15 * time.Second,
}

err := server.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
panic(err)
}
}()
return wsWriter
}

// Flush does nothing at the moment
func (wsw *WSWriter) Flush() error {
return nil
}

// Close closes the connection
func (wsw *WSWriter) Close() error {
if wsw.conn == nil {
return nil
}
return wsw.conn.Close()
}

// Write sends every event to the frontend and then waits for a message detailing how to proceed with that event
// The returned EventList contains the accepted events
func (wsw *WSWriter) Write(list *events.EventList, _ int64) (*events.EventList, error) {
for wsw.conn == nil {
wsw.logger.Log(logging.LevelInfo, "No connection")
time.Sleep(time.Millisecond * 100) // TODO: Why do we sleep here? Do we need it?
}
if list.Len() == 0 {
return list, nil
}

acceptedEvents := events.EmptyList()
iter := list.Iterator()

for event := iter.Next(); event != nil; event = iter.Next() {
// Create a new JSON object with a timestamp field
timestamp := time.Now()
logData := map[string]interface{}{
"event": event,
"timestamp": timestamp,
}

// Marshal the JSON data
message, err := json.Marshal(logData)
if err != nil {
panic(err)
}

// Send the JSON message over WebSocket
if err := wsw.conn.WriteMessage(websocket.TextMessage, message); err != nil {
return list, fmt.Errorf("error sending message over WebSocket: %w", err)
}

action := <-wsw.eventSignal
acceptedEvents, _ = eventAction(action["Type"], action["Value"], acceptedEvents, event)
}
return acceptedEvents, nil
}

func (wsw *WSWriter) HandleClientSignal(signal map[string]string) {
wsw.eventSignal <- signal
}

// EventAction decides, based on the input what exactly is done next with the current event
func eventAction(
actionType string,
_ string,
acceptedEvents *events.EventList,
currentEvent *eventpb.Event,
) (*events.EventList, error) {
if actionType == "accept" {
acceptedEvents.PushBack(currentEvent)
}
return acceptedEvents, nil
}
41 changes: 35 additions & 6 deletions samples/pingpong/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package main

import (
"context"
"flag"
"fmt"
"os"
"time"

"github.com/filecoin-project/mir/pkg/debugger"

"github.com/filecoin-project/mir"
"github.com/filecoin-project/mir/pkg/eventlog"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/net/grpc"
Expand All @@ -18,17 +22,42 @@ import (

func main() {
fmt.Println("Starting ping-pong.")

// Manually create system membership with just 2 nodes.
membership := &trantorpbtypes.Membership{map[t.NodeID]*trantorpbtypes.NodeIdentity{ // nolint:govet
"0": {"0", "/ip4/127.0.0.1/tcp/10000", nil, "1"}, // nolint:govet
"1": {"1", "/ip4/127.0.0.1/tcp/10001", nil, "1"}, // nolint:govet
}}

// Get own ID from command line.
ownID := t.NodeID(os.Args[1])
debug := flag.Bool("d", false, "Enable debug mode")
debugPort := flag.String("port", "", "Debug port number")
flag.Parse()
var ownID t.NodeID
var interceptor *eventlog.Recorder
var err error

if *debug {
// In debug mode, expect the next argument to be the node ID
if flag.NArg() > 0 {
ownID = t.NodeID(flag.Arg(0))
} else {
fmt.Println("Node ID must be provided in debug mode")
os.Exit(1)
}
interceptor, err = debugger.NewWebSocketDebugger(ownID, *debugPort, logging.ConsoleInfoLogger) // replace ownID with port number
if err != nil {
panic(err)
}
} else {
// If not in debug mode, use the first argument as the node ID
if len(os.Args) > 1 {
ownID = t.NodeID(os.Args[1])
} else {
fmt.Println("Node ID must be provided")
os.Exit(1)
}
}

// Instantiate network trnasport module and establish connections.
// Instantiate network transport module and establish connections.
transport, err := grpc.NewTransport(ownID, membership.Nodes[ownID].Addr, logging.ConsoleWarnLogger)
if err != nil {
panic(err)
Expand All @@ -48,7 +77,7 @@ func main() {
"pingpong": lowlevel.NewPingPong(ownID),
"timer": timer.New(),
},
nil,
interceptor,
)
if err != nil {
panic(err)
Expand All @@ -60,7 +89,7 @@ func main() {
nodeError <- node.Run(context.Background())
}()
fmt.Println("Mir node running.")
time.Sleep(5 * time.Second)
time.Sleep(50 * time.Second)

// Stop the node.
node.Stop()
Expand Down
Loading