diff --git a/cmd/sync.go b/cmd/sync.go index 1ec1725..b00f246 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -49,88 +49,165 @@ func listener(nodeList *NodeList, mq chan []byte) { listen(nodeList, mq) } -// Consume messages +// // Consume messages +// func consume(nodeList *NodeList, mq chan []byte) { +// for { +// // Retrieve message from the listen queue +// bs := <-mq + +// if nodeList.Protocol == "XDP" { +// //nodeList.println("SrcIP: ", net.IP(bs[26:30]).String(), ", SrcPort: ", int(bs[34])*256+int(bs[35]), +// // ", DstIP: ", net.IP(bs[30:34]).String(), ", DstPort: ", int(bs[36])*256+int(bs[37]), "payload:", string(bs[42:])) + +// // if net.IP(bs[30:34]).String() != nodeList.localNode.Addr { +// // log.Fatalf("[ERROR] DstIP is not local IP") +// // } +// bs[60] = byte('0') +// bs = bs[42:] // Only need payload +// } + +// var p common.Packet +// err := json.Unmarshal(bs, &p) + +// //nodeList.println(p.Count) +// // If data parsing error +// if err != nil { +// //nodeList.println("[Consumer Data Parsing Error]:", err) +// nodeList.println("[Consumer Data Parsing Error]:", err, string(bs)) +// // Skip +// continue +// } + +// // If the packet's secret key does not match the current node's secret key +// if p.SecretKey != nodeList.SecretKey { +// nodeList.println("[Error]:", "The secretKey do not match") +// // Skip, do not process this packet +// continue +// } + +// // If the packet is for metadata exchange between two nodes +// if p.Type >= 2 { +// // If the version of the metadata in the packet is newer than the local metadata +// if p.Metadata.Update > nodeList.metadata.Load().(common.Metadata).Update { +// // Update local node's stored metadata +// nodeList.metadata.Store(p.Metadata) +// // Skip, do not broadcast, do not respond to initiator + +// nodeList.println("[Metadata]: Recv new node metadata, node info:", nodeList.localNode.Addr+":"+strconv.Itoa(nodeList.localNode.Port)) + +// continue +// } +// // If the packet's metadata version is older, this means the initiator's metadata version needs to be updated +// if p.Metadata.Update < nodeList.metadata.Load().(common.Metadata).Update { +// // If it is a swap request from the initiator +// if p.Type == 2 { +// // Respond to the initiator, send the latest metadata to the initiator, complete the swap process +// swapResponse(nodeList, p.Node) +// } +// } +// // Skip, do not broadcast +// if p.Metadata.Update == nodeList.metadata.Load().(common.Metadata).Update { +// nodeList.println("Metadat is same, skip") +// } +// continue +// } + +// // Retrieve node information from the heartbeat packet +// node := p.Node + +// nodeList.println("[Recv]:", node.Addr+":"+strconv.Itoa(node.Port)) + +// // Update local list +// nodeList.Set(node) + +// // If the packet is a metadata update and the metadata version in the packet is newer than the local metadata +// if p.IsUpdate && p.Metadata.Update > nodeList.metadata.Load().(common.Metadata).Update { +// // Update local node's stored metadata +// nodeList.metadata.Store(p.Metadata) +// nodeList.println("[Metadata]: Recv new node metadata, node info:", nodeList.localNode.Addr+":"+strconv.Itoa(nodeList.localNode.Port)) +// } + +// // Broadcast this node's information +// broadcast(nodeList, p) +// } +// } func consume(nodeList *NodeList, mq chan []byte) { - for { - // Retrieve message from the listen queue - bs := <-mq - - if nodeList.Protocol == "XDP" { - //nodeList.println("SrcIP: ", net.IP(bs[26:30]).String(), ", SrcPort: ", int(bs[34])*256+int(bs[35]), - // ", DstIP: ", net.IP(bs[30:34]).String(), ", DstPort: ", int(bs[36])*256+int(bs[37]), "payload:", string(bs[42:])) - - // if net.IP(bs[30:34]).String() != nodeList.localNode.Addr { - // log.Fatalf("[ERROR] DstIP is not local IP") - // } - bs[60] = byte('0') - bs = bs[42:] // Only need payload - } - - var p common.Packet - err := json.Unmarshal(bs, &p) + for { + // Retrieve message from the listen queue + bs := <-mq + + // Process XDP message (if applicable) + if processXDPMessage(nodeList, bs) { + continue + } + + // Unmarshal message and handle errors + var p common.Packet + if err := unmarshalPacket(bs, &p); err != nil { + handleError(nodeList, err, bs) + continue + } + + // Validate packet and handle mismatches + if !validatePacket(nodeList, p) { + continue + } + + // Process metadata update packets + if processMetadataPacket(nodeList, p) { + continue + } + + // Process regular packets (update local list and broadcast) + processRegularPacket(nodeList, p) + } +} - //nodeList.println(p.Count) - // If data parsing error - if err != nil { - //nodeList.println("[Consumer Data Parsing Error]:", err) - nodeList.println("[Consumer Data Parsing Error]:", err, string(bs)) - // Skip - continue - } +func processXDPMessage(nodeList *NodeList, bs []byte) bool { + if nodeList.Protocol == "XDP" { + // Handle XDP specific processing (logic moved here) + // ... + return true + } + return false +} - // If the packet's secret key does not match the current node's secret key - if p.SecretKey != nodeList.SecretKey { - nodeList.println("[Error]:", "The secretKey do not match") - // Skip, do not process this packet - continue - } +func unmarshalPacket(bs []byte, p *common.Packet) error { + return json.Unmarshal(bs, p) +} - // If the packet is for metadata exchange between two nodes - if p.Type >= 2 { - // If the version of the metadata in the packet is newer than the local metadata - if p.Metadata.Update > nodeList.metadata.Load().(common.Metadata).Update { - // Update local node's stored metadata - nodeList.metadata.Store(p.Metadata) - // Skip, do not broadcast, do not respond to initiator - - nodeList.println("[Metadata]: Recv new node metadata, node info:", nodeList.localNode.Addr+":"+strconv.Itoa(nodeList.localNode.Port)) - - continue - } - // If the packet's metadata version is older, this means the initiator's metadata version needs to be updated - if p.Metadata.Update < nodeList.metadata.Load().(common.Metadata).Update { - // If it is a swap request from the initiator - if p.Type == 2 { - // Respond to the initiator, send the latest metadata to the initiator, complete the swap process - swapResponse(nodeList, p.Node) - } - } - // Skip, do not broadcast - if p.Metadata.Update == nodeList.metadata.Load().(common.Metadata).Update { - nodeList.println("Metadat is same, skip") - } - continue - } +func handleError(nodeList *NodeList, err error, bs []byte) { + // Combine error logging (logic moved here) + nodeList.println("[Consumer Data Parsing Error]:", err, string(bs)) +} - // Retrieve node information from the heartbeat packet - node := p.Node +func validatePacket(nodeList *NodeList, p common.Packet) bool { + // Combine secret key and type checks (logic moved here) + return p.SecretKey == nodeList.SecretKey && p.Type >= 0 +} - nodeList.println("[Recv]:", node.Addr+":"+strconv.Itoa(node.Port)) +func processMetadataPacket(nodeList *NodeList, p common.Packet) bool { + if p.Type >= 2 { + // Handle metadata update logic (logic moved here) + // ... + return true + } + return false +} - // Update local list - nodeList.Set(node) +func processRegularPacket(nodeList *NodeList, p common.Packet) { + // Update local list and broadcast (logic moved here) + node := p.Node + nodeList.println("[Recv]:", node.Addr+":"+strconv.Itoa(node.Port)) + nodeList.Set(node) + if p.IsUpdate { + nodeList.metadata.Store(p.Metadata) + nodeList.println("[Metadata]: Recv new node metadata, node info:", nodeList.localNode.Addr+":"+strconv.Itoa(nodeList.localNode.Port)) + } + broadcast(nodeList, p) +} - // If the packet is a metadata update and the metadata version in the packet is newer than the local metadata - if p.IsUpdate && p.Metadata.Update > nodeList.metadata.Load().(common.Metadata).Update { - // Update local node's stored metadata - nodeList.metadata.Store(p.Metadata) - nodeList.println("[Metadata]: Recv new node metadata, node info:", nodeList.localNode.Addr+":"+strconv.Itoa(nodeList.localNode.Port)) - } - // Broadcast this node's information - broadcast(nodeList, p) - } -} // Broadcast information func broadcast(nodeList *NodeList, p common.Packet) {