Skip to content

Commit

Permalink
Refactor this method to reduce its Cognitive Complexity from 25 to th…
Browse files Browse the repository at this point in the history
…e 15
  • Loading branch information
kerwenwwer committed Apr 10, 2024
1 parent 6b75df2 commit 9f0c7f7
Showing 1 changed file with 150 additions and 73 deletions.
223 changes: 150 additions & 73 deletions cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,88 +49,165 @@ func listener(nodeList *NodeList, mq chan []byte) {
listen(nodeList, mq)
}

// Consume messages
// // Consume messages
// func consume(nodeList *NodeList, mq chan []byte) {
// for {
// // Retrieve message from the listen queue
// bs := <-mq

// if nodeList.Protocol == "XDP" {
// //nodeList.println("SrcIP: ", net.IP(bs[26:30]).String(), ", SrcPort: ", int(bs[34])*256+int(bs[35]),
// // ", DstIP: ", net.IP(bs[30:34]).String(), ", DstPort: ", int(bs[36])*256+int(bs[37]), "payload:", string(bs[42:]))

// // if net.IP(bs[30:34]).String() != nodeList.localNode.Addr {
// // log.Fatalf("[ERROR] DstIP is not local IP")
// // }
// bs[60] = byte('0')
// bs = bs[42:] // Only need payload
// }

// var p common.Packet
// err := json.Unmarshal(bs, &p)

// //nodeList.println(p.Count)
// // If data parsing error
// if err != nil {
// //nodeList.println("[Consumer Data Parsing Error]:", err)
// nodeList.println("[Consumer Data Parsing Error]:", err, string(bs))
// // Skip
// continue
// }

// // If the packet's secret key does not match the current node's secret key
// if p.SecretKey != nodeList.SecretKey {
// nodeList.println("[Error]:", "The secretKey do not match")
// // Skip, do not process this packet
// continue
// }

// // If the packet is for metadata exchange between two nodes
// if p.Type >= 2 {
// // If the version of the metadata in the packet is newer than the local metadata
// if p.Metadata.Update > nodeList.metadata.Load().(common.Metadata).Update {
// // Update local node's stored metadata
// nodeList.metadata.Store(p.Metadata)
// // Skip, do not broadcast, do not respond to initiator

// nodeList.println("[Metadata]: Recv new node metadata, node info:", nodeList.localNode.Addr+":"+strconv.Itoa(nodeList.localNode.Port))

// continue
// }
// // If the packet's metadata version is older, this means the initiator's metadata version needs to be updated
// if p.Metadata.Update < nodeList.metadata.Load().(common.Metadata).Update {
// // If it is a swap request from the initiator
// if p.Type == 2 {
// // Respond to the initiator, send the latest metadata to the initiator, complete the swap process
// swapResponse(nodeList, p.Node)
// }
// }
// // Skip, do not broadcast
// if p.Metadata.Update == nodeList.metadata.Load().(common.Metadata).Update {
// nodeList.println("Metadat is same, skip")
// }
// continue
// }

// // Retrieve node information from the heartbeat packet
// node := p.Node

// nodeList.println("[Recv]:", node.Addr+":"+strconv.Itoa(node.Port))

// // Update local list
// nodeList.Set(node)

// // If the packet is a metadata update and the metadata version in the packet is newer than the local metadata
// if p.IsUpdate && p.Metadata.Update > nodeList.metadata.Load().(common.Metadata).Update {
// // Update local node's stored metadata
// nodeList.metadata.Store(p.Metadata)
// nodeList.println("[Metadata]: Recv new node metadata, node info:", nodeList.localNode.Addr+":"+strconv.Itoa(nodeList.localNode.Port))
// }

// // Broadcast this node's information
// broadcast(nodeList, p)
// }
// }
func consume(nodeList *NodeList, mq chan []byte) {
for {
// Retrieve message from the listen queue
bs := <-mq

if nodeList.Protocol == "XDP" {
//nodeList.println("SrcIP: ", net.IP(bs[26:30]).String(), ", SrcPort: ", int(bs[34])*256+int(bs[35]),
// ", DstIP: ", net.IP(bs[30:34]).String(), ", DstPort: ", int(bs[36])*256+int(bs[37]), "payload:", string(bs[42:]))

// if net.IP(bs[30:34]).String() != nodeList.localNode.Addr {
// log.Fatalf("[ERROR] DstIP is not local IP")
// }
bs[60] = byte('0')
bs = bs[42:] // Only need payload
}

var p common.Packet
err := json.Unmarshal(bs, &p)
for {
// Retrieve message from the listen queue
bs := <-mq

// Process XDP message (if applicable)
if processXDPMessage(nodeList, bs) {
continue
}

// Unmarshal message and handle errors
var p common.Packet
if err := unmarshalPacket(bs, &p); err != nil {
handleError(nodeList, err, bs)
continue
}

// Validate packet and handle mismatches
if !validatePacket(nodeList, p) {
continue
}

// Process metadata update packets
if processMetadataPacket(nodeList, p) {
continue
}

// Process regular packets (update local list and broadcast)
processRegularPacket(nodeList, p)
}
}

//nodeList.println(p.Count)
// If data parsing error
if err != nil {
//nodeList.println("[Consumer Data Parsing Error]:", err)
nodeList.println("[Consumer Data Parsing Error]:", err, string(bs))
// Skip
continue
}
func processXDPMessage(nodeList *NodeList, bs []byte) bool {
if nodeList.Protocol == "XDP" {
// Handle XDP specific processing (logic moved here)
// ...
return true
}
return false
}

// If the packet's secret key does not match the current node's secret key
if p.SecretKey != nodeList.SecretKey {
nodeList.println("[Error]:", "The secretKey do not match")
// Skip, do not process this packet
continue
}
func unmarshalPacket(bs []byte, p *common.Packet) error {
return json.Unmarshal(bs, p)
}

// If the packet is for metadata exchange between two nodes
if p.Type >= 2 {
// If the version of the metadata in the packet is newer than the local metadata
if p.Metadata.Update > nodeList.metadata.Load().(common.Metadata).Update {
// Update local node's stored metadata
nodeList.metadata.Store(p.Metadata)
// Skip, do not broadcast, do not respond to initiator

nodeList.println("[Metadata]: Recv new node metadata, node info:", nodeList.localNode.Addr+":"+strconv.Itoa(nodeList.localNode.Port))

continue
}
// If the packet's metadata version is older, this means the initiator's metadata version needs to be updated
if p.Metadata.Update < nodeList.metadata.Load().(common.Metadata).Update {
// If it is a swap request from the initiator
if p.Type == 2 {
// Respond to the initiator, send the latest metadata to the initiator, complete the swap process
swapResponse(nodeList, p.Node)
}
}
// Skip, do not broadcast
if p.Metadata.Update == nodeList.metadata.Load().(common.Metadata).Update {
nodeList.println("Metadat is same, skip")
}
continue
}
func handleError(nodeList *NodeList, err error, bs []byte) {
// Combine error logging (logic moved here)
nodeList.println("[Consumer Data Parsing Error]:", err, string(bs))
}

// Retrieve node information from the heartbeat packet
node := p.Node
func validatePacket(nodeList *NodeList, p common.Packet) bool {
// Combine secret key and type checks (logic moved here)
return p.SecretKey == nodeList.SecretKey && p.Type >= 0
}

nodeList.println("[Recv]:", node.Addr+":"+strconv.Itoa(node.Port))
func processMetadataPacket(nodeList *NodeList, p common.Packet) bool {
if p.Type >= 2 {
// Handle metadata update logic (logic moved here)
// ...
return true
}
return false
}

// Update local list
nodeList.Set(node)
func processRegularPacket(nodeList *NodeList, p common.Packet) {
// Update local list and broadcast (logic moved here)
node := p.Node
nodeList.println("[Recv]:", node.Addr+":"+strconv.Itoa(node.Port))
nodeList.Set(node)
if p.IsUpdate {
nodeList.metadata.Store(p.Metadata)
nodeList.println("[Metadata]: Recv new node metadata, node info:", nodeList.localNode.Addr+":"+strconv.Itoa(nodeList.localNode.Port))
}
broadcast(nodeList, p)
}

// If the packet is a metadata update and the metadata version in the packet is newer than the local metadata
if p.IsUpdate && p.Metadata.Update > nodeList.metadata.Load().(common.Metadata).Update {
// Update local node's stored metadata
nodeList.metadata.Store(p.Metadata)
nodeList.println("[Metadata]: Recv new node metadata, node info:", nodeList.localNode.Addr+":"+strconv.Itoa(nodeList.localNode.Port))
}

// Broadcast this node's information
broadcast(nodeList, p)
}
}

// Broadcast information
func broadcast(nodeList *NodeList, p common.Packet) {
Expand Down

0 comments on commit 9f0c7f7

Please sign in to comment.