Skip to content

Commit

Permalink
Merge pull request #44 from HM4704/fix-missing-txs
Browse files Browse the repository at this point in the history
Fix missing txs
  • Loading branch information
lunfardo314 authored Feb 28, 2025
2 parents 82e9eb3 + 7c2c63b commit 382a9b3
Showing 1 changed file with 104 additions and 4 deletions.
108 changes: 104 additions & 4 deletions api/streaming/dag_vertex_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@ package streaming
import (
"encoding/json"
"net/http"
"sync"

"github.com/gorilla/websocket"
"github.com/lunfardo314/proxima/api"
"github.com/lunfardo314/proxima/core/txmetadata"
"github.com/lunfardo314/proxima/global"
"github.com/lunfardo314/proxima/ledger"
"github.com/lunfardo314/proxima/ledger/transaction"
"github.com/lunfardo314/proxima/util"
"github.com/lunfardo314/proxima/util/set"
)

type (
environment interface {
global.Logging
OnTransaction(fun func(tx *transaction.Transaction) bool)
TxBytesStore() global.TxBytesStore
}
wsServer struct {
environment
Expand All @@ -31,24 +36,119 @@ func Run(env environment) {
http.HandleFunc(api.PathDAGVertexStream, srv.dagVertexStreamHandler)
}

func vertexDepsForTx(srv *wsServer, txidstr string) []byte {

txid, err := ledger.TransactionIDFromHexString(txidstr)
if err != nil {
return nil
}

txBytesWithMetadata := srv.TxBytesStore().GetTxBytesWithMetadata(&txid)
if len(txBytesWithMetadata) == 0 {
return nil
}

_, txBytes, err := txmetadata.SplitTxBytesWithMetadata(txBytesWithMetadata)
if err != nil {
return nil
}

tx, err := transaction.FromBytes(txBytes, transaction.MainTxValidationOptions...)
if err != nil {
return nil
}
resp := api.VertexWithDependenciesFromTransaction(tx)
respBin, err := json.Marshal(resp)
if err != nil {
return nil
}
return respBin
}

// WebSocket handler
const keepMaxSlots = 10 // Keep only last 10 slots

func (srv *wsServer) dagVertexStreamHandler(w http.ResponseWriter, r *http.Request) {
u := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
conn, err := u.Upgrade(w, r, nil)
if err != nil {
srv.Log().Warnf("[%s] web socket client connected, remote: %s", TraceTag, r.RemoteAddr)
srv.Log().Warnf("[%s] WebSocket upgrade failed, remote: %s", TraceTag, r.RemoteAddr)
api.WriteErr(w, "failed to upgrade to websocket connection")
return
}

srv.Log().Infof("[%s] web socket client connected, remote: %s", TraceTag, r.RemoteAddr)

var respBin []byte
// Thread-safe storage for transactions per slot
var mu sync.Mutex
txSlots := make(map[uint32]set.Set[string]) // Slot -> Set of transaction IDs
var latestSlot uint32

srv.OnTransaction(func(tx *transaction.Transaction) bool {
srv.Tracef(TraceTag, "TX ID: %s", tx.IDShortString())
mu.Lock()
defer mu.Unlock()

txID := tx.IDShortString()
slot := uint32(tx.Timestamp().Slot())

srv.Tracef(TraceTag, "Processing TX ID: %s (Slot: %d)", txID, slot)

// Initialize latestSlot dynamically
if latestSlot == 0 {
latestSlot = slot
}

// Cleanup old slots
if slot > latestSlot {
latestSlot = slot
for oldSlot := range txSlots {
if oldSlot < latestSlot-keepMaxSlots {
delete(txSlots, oldSlot)
srv.Log().Infof("Removed old slot: %d", oldSlot)
}
}
}

// Ensure slot set exists
if _, exists := txSlots[slot]; !exists {
txSlots[slot] = set.New[string]()
}

// Convert transaction to vertex
vertexWD := api.VertexWithDependenciesFromTransaction(tx)
respBin, err = json.MarshalIndent(vertexWD, "", " ")

// Store transaction ID in its slot
txSlots[slot].Insert(vertexWD.ID)

// Process dependencies
for _, i := range vertexWD.Inputs {
txid, err := ledger.TransactionIDFromHexString(i)
if err != nil {
srv.Log().Warnf("Failed to parse TransactionID from hex: %s, err: %v", i, err)
continue // Skip this input
}

depSlot := uint32(txid.Timestamp().Slot())

// Ensure slot set exists
if _, exists := txSlots[depSlot]; !exists {
txSlots[depSlot] = set.New[string]()
}

if !txSlots[depSlot].Contains(i) {
respBin := vertexDepsForTx(srv, i)
if respBin != nil {
srv.Log().Infof("Send tx not seen yet %s", i)
txSlots[depSlot].Insert(i)
if err = conn.WriteMessage(websocket.TextMessage, respBin); err != nil {
srv.Log().Infof("[%s] WebSocket client disconnected, remote: %s, err = %v", TraceTag, r.RemoteAddr, err)
}
}
}
}

// Send the transaction itself
respBin, err := json.MarshalIndent(vertexWD, "", " ")
util.AssertNoError(err)

if err = conn.WriteMessage(websocket.TextMessage, respBin); err != nil {
Expand Down

0 comments on commit 382a9b3

Please sign in to comment.