diff --git a/Dockerfile b/Dockerfile index dd59fbf..4cb735d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,7 +18,7 @@ RUN apt-get update && \ rm -rf /var/lib/apt/lists/* # This command removes the apt cache # Copy necessary files from the host to the container filesystem -COPY bpf/* /bpf/ +COPY pkg/bpf/* /bpf/ COPY bin/xdp-gossip /usr/local/bin/xdp-gossip COPY k8s/entrypoint.sh /entrypoint.sh diff --git a/Makefile b/Makefile index 3d84612..7736f3f 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ all: build docker-build # Rule to generate BPF object files from C source bpf-objects: - go generate ./bpf/ + go generate ./pkg/bpf/ # Rule to build the main application build: bpf-objects diff --git a/k8s/deployment.yaml b/k8s/deployment.yaml index 44f7749..61d26fc 100644 --- a/k8s/deployment.yaml +++ b/k8s/deployment.yaml @@ -4,7 +4,7 @@ metadata: name: gossip-service namespace: gossip spec: - replicas: 60 + replicas: 5 selector: matchLabels: app: gossip-service diff --git a/main.go b/main.go index fe2eedc..a79d6d7 100644 --- a/main.go +++ b/main.go @@ -13,9 +13,11 @@ import ( // Third-party imports are grouped separately. // This includes all external packages not part of the standard library. // Keeping standard and third-party imports separate improves readability. + "github.com/kerwenwwer/eGossip/modules/helper" nd "github.com/kerwenwwer/eGossip/modules/nodeList" "github.com/kerwenwwer/eGossip/pkg/bpf" "github.com/kerwenwwer/eGossip/pkg/common" + logger "github.com/kerwenwwer/eGossip/pkg/logger" "github.com/spf13/cobra" // Cobra package for CLI interactions. ) @@ -100,6 +102,8 @@ func startServer(cfg Config) error { return fmt.Errorf("[Init]: Failed to initialize node list: %w", err) } + nodeList.Logger = logger.NewLogger(&logger.LoggerConfig{Development: true}) + nodeList.Join() // Join the network. http.HandleFunc("/set", nodeList.SetNodeHandler()) @@ -189,7 +193,7 @@ func loadAndAssignBPFProgram(nodeList *nd.NodeList, linkName string, debug bool, } nodeList.Program = obj - l, xsk := common.ProgramHandler(linkName, obj, debug, mode) + l, xsk := helper.ProgramHandler(linkName, obj, debug, mode) if l != nil { defer l.Close() } diff --git a/modules/nodeList/nodeList.go b/modules/nodeList/nodeList.go index 7b3a529..498b556 100644 --- a/modules/nodeList/nodeList.go +++ b/modules/nodeList/nodeList.go @@ -126,7 +126,7 @@ func (nodeList *NodeList) Join() { // Consume the information in the mq queue go consume(nodeList, mq) - nodeList.Logger.Sugar().Panicln("[Control]: Join signal for ", nodeList.LocalNode) + nodeList.Logger.Sugar().Infoln("[Control]: Join signal for ", nodeList.LocalNode) } // Stop stops the broadcasting of heartbeat @@ -139,7 +139,7 @@ func (nodeList *NodeList) Stop() { return } - nodeList.Logger.Sugar().Panicln("[Control]: Stop signal for ", nodeList.LocalNode) + nodeList.Logger.Sugar().Infoln("[Control]: Stop signal for ", nodeList.LocalNode) nodeList.status.Store(false) } @@ -158,7 +158,7 @@ func (nodeList *NodeList) Start() { // Return directly return } - nodeList.Logger.Sugar().Panicln("[Control]: Start signal for ", nodeList.LocalNode) + nodeList.Logger.Sugar().Infoln("[Control]: Start signal for ", nodeList.LocalNode) nodeList.status.Store(true) // Periodically broadcast local node information go task(nodeList) @@ -204,7 +204,7 @@ func (nodeList *NodeList) Get() []common.Node { //If this node has not been updated for a while, delete it if v.(int64)+nodeList.Timeout < time.Now().Unix() { nodeList.nodes.Delete(k) - nodeList.Logger.Sugar().Panicln("[[Timeout]:", k, "has been deleted]") + nodeList.Logger.Sugar().Warnln("[[Timeout]:", k, "has been deleted]") } else { nodes = append(nodes, k.(common.Node)) } @@ -222,7 +222,7 @@ func (nodeList *NodeList) Publish(newMetadata []byte) { return } - nodeList.Logger.Sugar().Panicln("[Control]: Metadata Publish in", nodeList.LocalNode, "/ [Metadata]:", newMetadata) + nodeList.Logger.Sugar().Infoln("[Control]: Metadata Publish in", nodeList.LocalNode, "/ [Metadata]:", newMetadata) // Add the local node to the infected node list var infected = make(map[string]bool) diff --git a/modules/nodeList/sync.go b/modules/nodeList/sync.go index 66fe29b..ce9fd4d 100644 --- a/modules/nodeList/sync.go +++ b/modules/nodeList/sync.go @@ -2,7 +2,6 @@ package nodeList import ( "encoding/json" - "fmt" "strconv" "time" @@ -131,7 +130,7 @@ func processRegularPacket(nodeList *NodeList, p common.Packet) { nodeList.Set(node) if p.IsUpdate { nodeList.metadata.Store(p.Metadata) - nodeList.Logger.Sugar().Panicln("[Metadata]: Recv new node metadata, node info:", nodeList.LocalNode.Addr+":"+strconv.Itoa(nodeList.LocalNode.Port)) + nodeList.Logger.Sugar().Infoln("[Metadata]: Recv new node metadata, node info:", nodeList.LocalNode.Addr+":"+strconv.Itoa(nodeList.LocalNode.Port)) } broadcast(nodeList, p) } @@ -184,7 +183,7 @@ func broadcast(nodeList *NodeList, p common.Packet) { for _, v := range targetNodes { bs, err := json.Marshal(p) if err != nil { - nodeList.Logger.Sugar().Panic("[Infection Error]:", err) + nodeList.Logger.Sugar().Panicln("[Infection Error]:", err) } // Send the packet @@ -335,6 +334,6 @@ func listen(nodeList *NodeList, mq chan []byte) { } else if nodeList.Protocol == "XDP" { transport.XdpListen(nodeList.Xsk, mq) } else { - fmt.Println("Protocol not supported, only UDP, TC and XDP.") + nodeList.Logger.Sugar().Panicln("Protocol not supported, only UDP, TC and XDP.") } } diff --git a/pkg/common/common.go b/pkg/common/common.go index 74e1f91..bb58df1 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -169,8 +169,17 @@ func getGatewayIP() (net.IP, error) { return nil, fmt.Errorf("No default gateway found") } +// Convert an IPv4-mapped IPv6 address to IPv4 if necessary +func convertIPv4MappedIPv6ToIPv4(addr netip.Addr) netip.Addr { + if addr.Is4In6() { + v4Addr := addr.As4() + return netip.AddrFrom4(v4Addr) + } + return addr +} + func FindGatewayMAC(interfaceName string) (net.HardwareAddr, error) { - gatewayIP, err := getGatewayIP() // Assuming getGatewayIP returns net.IP, handle its error properly + gatewayIP, err := getGatewayIP() // getGatewayIP must return net.IP and handle its error if err != nil { return nil, fmt.Errorf("fail to get gateway IP: %v", err) } @@ -192,6 +201,9 @@ func FindGatewayMAC(interfaceName string) (net.HardwareAddr, error) { return nil, fmt.Errorf("invalid IP address: %v", gatewayIP) } + // Check for IPv4 mapped in IPv6 and convert if necessary + gatewayAddr = convertIPv4MappedIPv6ToIPv4(gatewayAddr) + mac, err := client.Resolve(gatewayAddr) if err != nil { return nil, fmt.Errorf("fail to resolve %v: %v", gatewayAddr, err) diff --git a/pkg/transport/udp.go b/pkg/transport/udp.go index b4ef6d1..1cac197 100644 --- a/pkg/transport/udp.go +++ b/pkg/transport/udp.go @@ -6,7 +6,6 @@ import ( //"log" "net" - "github.com/asavie/xdp" logger "github.com/kerwenwwer/eGossip/pkg/logger" ) @@ -80,38 +79,3 @@ func UdpListen(logger *logger.Logger, addr string, port int, size int, mq chan [ mq <- b } } - -func XdpListen(xsk *xdp.Socket, mq chan []byte) { - for { - // If there are any free slots on the Fill queue... - if n := xsk.NumFreeFillSlots(); n > 0 { - // ...then fetch up to that number of not-in-use - // descriptors and push them onto the Fill ring queue - // for the kernel to fill them with the received - // frames. - xsk.Fill(xsk.GetDescs(n)) - } - - // Wait for receive - meaning the kernel has - // produced one or more descriptors filled with a received - // frame onto the Rx ring queue. - numRx, _, err := xsk.Poll(-1) - if err != nil { - fmt.Printf("error: %v\n", err) - return - } - - if numRx > 0 { - // Consume the descriptors filled with received frames - // from the Rx ring queue. - rxDescs := xsk.Receive(numRx) - // Print the received frames and also modify them - // in-place replacing the destination MAC address with - // broadcast address. - for i := 0; i < len(rxDescs); i++ { - pktData := xsk.GetFrame(rxDescs[i]) - mq <- pktData - } - } - } -} diff --git a/pkg/transport/xdp.go b/pkg/transport/xdp.go new file mode 100644 index 0000000..1c252c9 --- /dev/null +++ b/pkg/transport/xdp.go @@ -0,0 +1,44 @@ +package transport + +import ( + + //"log" + + "github.com/asavie/xdp" + "github.com/kerwenwwer/eGossip/pkg/logger" +) + +func XdpListen(xsk *xdp.Socket, mq chan []byte) { + for { + // If there are any free slots on the Fill queue... + if n := xsk.NumFreeFillSlots(); n > 0 { + // ...then fetch up to that number of not-in-use + // descriptors and push them onto the Fill ring queue + // for the kernel to fill them with the received + // frames. + xsk.Fill(xsk.GetDescs(n)) + } + + // Wait for receive - meaning the kernel has + // produced one or more descriptors filled with a received + // frame onto the Rx ring queue. + numRx, _, err := xsk.Poll(-1) + if err != nil { + logger.NewNopLogger().Sugar().Panicln("error: %v\n", err) + return + } + + if numRx > 0 { + // Consume the descriptors filled with received frames + // from the Rx ring queue. + rxDescs := xsk.Receive(numRx) + // Print the received frames and also modify them + // in-place replacing the destination MAC address with + // broadcast address. + for i := 0; i < len(rxDescs); i++ { + pktData := xsk.GetFrame(rxDescs[i]) + mq <- pktData + } + } + } +}