Skip to content

Commit

Permalink
Fix typo issue and some bugs in looger
Browse files Browse the repository at this point in the history
  • Loading branch information
kerwenwwer committed May 4, 2024
1 parent b5062f9 commit 552c75f
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 50 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ RUN apt-get update && \
rm -rf /var/lib/apt/lists/* # This command removes the apt cache

# Copy necessary files from the host to the container filesystem
COPY bpf/* /bpf/
COPY pkg/bpf/* /bpf/
COPY bin/xdp-gossip /usr/local/bin/xdp-gossip
COPY k8s/entrypoint.sh /entrypoint.sh

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ all: build docker-build

# Rule to generate BPF object files from C source
bpf-objects:
go generate ./bpf/
go generate ./pkg/bpf/

# Rule to build the main application
build: bpf-objects
Expand Down
2 changes: 1 addition & 1 deletion k8s/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: gossip-service
namespace: gossip
spec:
replicas: 60
replicas: 5
selector:
matchLabels:
app: gossip-service
Expand Down
6 changes: 5 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
// Third-party imports are grouped separately.
// This includes all external packages not part of the standard library.
// Keeping standard and third-party imports separate improves readability.
"github.com/kerwenwwer/eGossip/modules/helper"
nd "github.com/kerwenwwer/eGossip/modules/nodeList"
"github.com/kerwenwwer/eGossip/pkg/bpf"
"github.com/kerwenwwer/eGossip/pkg/common"
logger "github.com/kerwenwwer/eGossip/pkg/logger"
"github.com/spf13/cobra" // Cobra package for CLI interactions.
)

Expand Down Expand Up @@ -100,6 +102,8 @@ func startServer(cfg Config) error {
return fmt.Errorf("[Init]: Failed to initialize node list: %w", err)
}

nodeList.Logger = logger.NewLogger(&logger.LoggerConfig{Development: true})

nodeList.Join() // Join the network.

http.HandleFunc("/set", nodeList.SetNodeHandler())
Expand Down Expand Up @@ -189,7 +193,7 @@ func loadAndAssignBPFProgram(nodeList *nd.NodeList, linkName string, debug bool,
}

nodeList.Program = obj
l, xsk := common.ProgramHandler(linkName, obj, debug, mode)
l, xsk := helper.ProgramHandler(linkName, obj, debug, mode)
if l != nil {
defer l.Close()
}
Expand Down
10 changes: 5 additions & 5 deletions modules/nodeList/nodeList.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (nodeList *NodeList) Join() {
// Consume the information in the mq queue
go consume(nodeList, mq)

nodeList.Logger.Sugar().Panicln("[Control]: Join signal for ", nodeList.LocalNode)
nodeList.Logger.Sugar().Infoln("[Control]: Join signal for ", nodeList.LocalNode)
}

// Stop stops the broadcasting of heartbeat
Expand All @@ -139,7 +139,7 @@ func (nodeList *NodeList) Stop() {
return
}

nodeList.Logger.Sugar().Panicln("[Control]: Stop signal for ", nodeList.LocalNode)
nodeList.Logger.Sugar().Infoln("[Control]: Stop signal for ", nodeList.LocalNode)
nodeList.status.Store(false)
}

Expand All @@ -158,7 +158,7 @@ func (nodeList *NodeList) Start() {
// Return directly
return
}
nodeList.Logger.Sugar().Panicln("[Control]: Start signal for ", nodeList.LocalNode)
nodeList.Logger.Sugar().Infoln("[Control]: Start signal for ", nodeList.LocalNode)
nodeList.status.Store(true)
// Periodically broadcast local node information
go task(nodeList)
Expand Down Expand Up @@ -204,7 +204,7 @@ func (nodeList *NodeList) Get() []common.Node {
//If this node has not been updated for a while, delete it
if v.(int64)+nodeList.Timeout < time.Now().Unix() {
nodeList.nodes.Delete(k)
nodeList.Logger.Sugar().Panicln("[[Timeout]:", k, "has been deleted]")
nodeList.Logger.Sugar().Warnln("[[Timeout]:", k, "has been deleted]")
} else {
nodes = append(nodes, k.(common.Node))
}
Expand All @@ -222,7 +222,7 @@ func (nodeList *NodeList) Publish(newMetadata []byte) {
return
}

nodeList.Logger.Sugar().Panicln("[Control]: Metadata Publish in", nodeList.LocalNode, "/ [Metadata]:", newMetadata)
nodeList.Logger.Sugar().Infoln("[Control]: Metadata Publish in", nodeList.LocalNode, "/ [Metadata]:", newMetadata)

// Add the local node to the infected node list
var infected = make(map[string]bool)
Expand Down
7 changes: 3 additions & 4 deletions modules/nodeList/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package nodeList

import (
"encoding/json"
"fmt"
"strconv"
"time"

Expand Down Expand Up @@ -131,7 +130,7 @@ func processRegularPacket(nodeList *NodeList, p common.Packet) {
nodeList.Set(node)
if p.IsUpdate {
nodeList.metadata.Store(p.Metadata)
nodeList.Logger.Sugar().Panicln("[Metadata]: Recv new node metadata, node info:", nodeList.LocalNode.Addr+":"+strconv.Itoa(nodeList.LocalNode.Port))
nodeList.Logger.Sugar().Infoln("[Metadata]: Recv new node metadata, node info:", nodeList.LocalNode.Addr+":"+strconv.Itoa(nodeList.LocalNode.Port))
}
broadcast(nodeList, p)
}
Expand Down Expand Up @@ -184,7 +183,7 @@ func broadcast(nodeList *NodeList, p common.Packet) {
for _, v := range targetNodes {
bs, err := json.Marshal(p)
if err != nil {
nodeList.Logger.Sugar().Panic("[Infection Error]:", err)
nodeList.Logger.Sugar().Panicln("[Infection Error]:", err)
}

// Send the packet
Expand Down Expand Up @@ -335,6 +334,6 @@ func listen(nodeList *NodeList, mq chan []byte) {
} else if nodeList.Protocol == "XDP" {
transport.XdpListen(nodeList.Xsk, mq)
} else {
fmt.Println("Protocol not supported, only UDP, TC and XDP.")
nodeList.Logger.Sugar().Panicln("Protocol not supported, only UDP, TC and XDP.")
}
}
14 changes: 13 additions & 1 deletion pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,17 @@ func getGatewayIP() (net.IP, error) {
return nil, fmt.Errorf("No default gateway found")
}

// Convert an IPv4-mapped IPv6 address to IPv4 if necessary
func convertIPv4MappedIPv6ToIPv4(addr netip.Addr) netip.Addr {
if addr.Is4In6() {
v4Addr := addr.As4()
return netip.AddrFrom4(v4Addr)
}
return addr
}

func FindGatewayMAC(interfaceName string) (net.HardwareAddr, error) {
gatewayIP, err := getGatewayIP() // Assuming getGatewayIP returns net.IP, handle its error properly
gatewayIP, err := getGatewayIP() // getGatewayIP must return net.IP and handle its error
if err != nil {
return nil, fmt.Errorf("fail to get gateway IP: %v", err)
}
Expand All @@ -192,6 +201,9 @@ func FindGatewayMAC(interfaceName string) (net.HardwareAddr, error) {
return nil, fmt.Errorf("invalid IP address: %v", gatewayIP)
}

// Check for IPv4 mapped in IPv6 and convert if necessary
gatewayAddr = convertIPv4MappedIPv6ToIPv4(gatewayAddr)

mac, err := client.Resolve(gatewayAddr)
if err != nil {
return nil, fmt.Errorf("fail to resolve %v: %v", gatewayAddr, err)
Expand Down
36 changes: 0 additions & 36 deletions pkg/transport/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
//"log"
"net"

"github.com/asavie/xdp"
logger "github.com/kerwenwwer/eGossip/pkg/logger"
)

Expand Down Expand Up @@ -80,38 +79,3 @@ func UdpListen(logger *logger.Logger, addr string, port int, size int, mq chan [
mq <- b
}
}

func XdpListen(xsk *xdp.Socket, mq chan []byte) {
for {
// If there are any free slots on the Fill queue...
if n := xsk.NumFreeFillSlots(); n > 0 {
// ...then fetch up to that number of not-in-use
// descriptors and push them onto the Fill ring queue
// for the kernel to fill them with the received
// frames.
xsk.Fill(xsk.GetDescs(n))
}

// Wait for receive - meaning the kernel has
// produced one or more descriptors filled with a received
// frame onto the Rx ring queue.
numRx, _, err := xsk.Poll(-1)
if err != nil {
fmt.Printf("error: %v\n", err)
return
}

if numRx > 0 {
// Consume the descriptors filled with received frames
// from the Rx ring queue.
rxDescs := xsk.Receive(numRx)
// Print the received frames and also modify them
// in-place replacing the destination MAC address with
// broadcast address.
for i := 0; i < len(rxDescs); i++ {
pktData := xsk.GetFrame(rxDescs[i])
mq <- pktData
}
}
}
}
44 changes: 44 additions & 0 deletions pkg/transport/xdp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package transport

import (

//"log"

"github.com/asavie/xdp"
"github.com/kerwenwwer/eGossip/pkg/logger"
)

func XdpListen(xsk *xdp.Socket, mq chan []byte) {
for {
// If there are any free slots on the Fill queue...
if n := xsk.NumFreeFillSlots(); n > 0 {
// ...then fetch up to that number of not-in-use
// descriptors and push them onto the Fill ring queue
// for the kernel to fill them with the received
// frames.
xsk.Fill(xsk.GetDescs(n))
}

// Wait for receive - meaning the kernel has
// produced one or more descriptors filled with a received
// frame onto the Rx ring queue.
numRx, _, err := xsk.Poll(-1)
if err != nil {
logger.NewNopLogger().Sugar().Panicln("error: %v\n", err)
return
}

if numRx > 0 {
// Consume the descriptors filled with received frames
// from the Rx ring queue.
rxDescs := xsk.Receive(numRx)
// Print the received frames and also modify them
// in-place replacing the destination MAC address with
// broadcast address.
for i := 0; i < len(rxDescs); i++ {
pktData := xsk.GetFrame(rxDescs[i])
mq <- pktData
}
}
}
}

0 comments on commit 552c75f

Please sign in to comment.