Skip to content

Commit

Permalink
Refactor dataplane code and fix ICMP integration test (#163)
Browse files Browse the repository at this point in the history
This PR refactors the dataplane code to be more clean and maintainable.
The `TCP_CONNECTIONS` map is now generic and aptly renamed to
`LB_CONNECTIONS`. The `BLIXT_CONNTRACK` map has been removed and the UDP
and ICMP traffic logic now uses `LB_CONNECTIONS`.
Furthermore, the ICMP integration test `TestUDPRouteNoReach` has been
fixed. The test now listens for raw ICMP packets originating from the
Gateway and thus needs to be run as root. By default, the test is
skipped. The test can be run by specifying the env var RUN_ICMP_TEST. A
Makeile target, `test.icmp.integration` has been added to make this
straightforward.
  • Loading branch information
k8s-ci-robot authored Jan 12, 2024
2 parents 37a0d78 + 4d29d6a commit 505becf
Show file tree
Hide file tree
Showing 17 changed files with 230 additions and 118 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,22 @@ jobs:
- name: build images
run: make build.all.images TAG=integration-tests

- name: Set cluster name in enviornment
run: echo "BLIXT_TEST_CLUSTER_NAME=`echo blixt-ci-${GITHUB_SHA} | cut -c1-8`" >> $GITHUB_ENV

- name: run integration tests
run: make test.integration
env:
BLIXT_TEST_KEEP_CLUSTER: true
BLIXT_CONTROLPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-controlplane"
BLIXT_DATAPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-dataplane"
BLIXT_UDP_SERVER_IMAGE: "ghcr.io/kubernetes-sigs/blixt-udp-test-server"
TAG: "integration-tests"

- name: run integration tests
run: make test.icmp.integration
env:
BLIXT_USE_EXISTING_KIND_CLUSTER: true
BLIXT_CONTROLPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-controlplane"
BLIXT_DATAPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-dataplane"
BLIXT_UDP_SERVER_IMAGE: "ghcr.io/kubernetes-sigs/blixt-udp-test-server"
Expand Down
12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ test.integration: manifests generate fmt vet
BLIXT_UDP_SERVER_IMAGE=$(BLIXT_UDP_SERVER_IMAGE):$(TAG) \
GOFLAGS="-tags=integration_tests" go test -race -v ./test/integration/...

.PHONY: test.icmp.integration
test.icmp.integration:
go clean -testcache
# This needs to run as sudo as the test involves listening for raw ICMP packets, which
# requires you to be root.
sudo env PATH=$(PATH) \
BLIXT_CONTROLPLANE_IMAGE=$(BLIXT_CONTROLPLANE_IMAGE):$(TAG) \
BLIXT_DATAPLANE_IMAGE=$(BLIXT_DATAPLANE_IMAGE):$(TAG) \
BLIXT_UDP_SERVER_IMAGE=$(BLIXT_UDP_SERVER_IMAGE):$(TAG) \
RUN_ICMP_TEST=true \
go test --tags=integration_tests -run "TestUDPRouteNoReach" -race -v ./test/integration/...

.PHONY: test.performance
test.performance: manifests generate fmt vet
go clean -testcache
Expand Down
4 changes: 2 additions & 2 deletions dataplane/api-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ use aya::maps::{HashMap, MapData};
use tonic::transport::Server;

use backends::backends_server::BackendsServer;
use common::{BackendKey, BackendList, ClientKey, TCPBackend};
use common::{BackendKey, BackendList, ClientKey, LoadBalancerMapping};

pub async fn start(
addr: Ipv4Addr,
port: u16,
backends_map: HashMap<MapData, BackendKey, BackendList>,
gateway_indexes_map: HashMap<MapData, BackendKey, u16>,
tcp_conns_map: HashMap<MapData, ClientKey, TCPBackend>,
tcp_conns_map: HashMap<MapData, ClientKey, LoadBalancerMapping>,
) -> Result<(), Error> {
let server = server::BackendService::new(backends_map, gateway_indexes_map, tcp_conns_map);
// TODO: mTLS https://github.com/Kong/blixt/issues/50
Expand Down
12 changes: 6 additions & 6 deletions dataplane/api-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ use tonic::{Request, Response, Status};
use crate::backends::backends_server::Backends;
use crate::backends::{Confirmation, InterfaceIndexConfirmation, PodIp, Targets, Vip};
use crate::netutils::{if_name_for_routing_ip, if_nametoindex};
use common::{Backend, BackendKey, BackendList, ClientKey, TCPBackend, BACKENDS_ARRAY_CAPACITY};
use common::{Backend, BackendKey, BackendList, ClientKey, LoadBalancerMapping, BACKENDS_ARRAY_CAPACITY};

pub struct BackendService {
backends_map: Arc<Mutex<HashMap<MapData, BackendKey, BackendList>>>,
gateway_indexes_map: Arc<Mutex<HashMap<MapData, BackendKey, u16>>>,
tcp_conns_map: Arc<Mutex<HashMap<MapData, ClientKey, TCPBackend>>>,
tcp_conns_map: Arc<Mutex<HashMap<MapData, ClientKey, LoadBalancerMapping>>>,
}

impl BackendService {
pub fn new(
backends_map: HashMap<MapData, BackendKey, BackendList>,
gateway_indexes_map: HashMap<MapData, BackendKey, u16>,
tcp_conns_map: HashMap<MapData, ClientKey, TCPBackend>,
tcp_conns_map: HashMap<MapData, ClientKey, LoadBalancerMapping>,
) -> BackendService {
BackendService {
backends_map: Arc::new(Mutex::new(backends_map)),
Expand Down Expand Up @@ -65,15 +65,15 @@ impl BackendService {
let mut tcp_conns_map = self.tcp_conns_map.lock().await;
for item in tcp_conns_map
.iter()
.collect::<Vec<Result<(ClientKey, TCPBackend), MapError>>>()
.collect::<Vec<Result<(ClientKey, LoadBalancerMapping), MapError>>>()
{
match item {
Ok((
client_key,
TCPBackend {
LoadBalancerMapping {
backend: _,
backend_key,
state: _,
tcp_state: _,
},
)) => {
if backend_key == key {
Expand Down
6 changes: 3 additions & 3 deletions dataplane/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ unsafe impl aya::Pod for TCPState {}

#[derive(Copy, Clone, Debug)]
#[repr(C)]
pub struct TCPBackend {
pub struct LoadBalancerMapping {
pub backend: Backend,
pub backend_key: BackendKey,
pub state: TCPState,
pub tcp_state: Option<TCPState>,
}

#[cfg(feature = "user")]
unsafe impl aya::Pod for TCPBackend {}
unsafe impl aya::Pod for LoadBalancerMapping {}
18 changes: 11 additions & 7 deletions dataplane/ebpf/src/egress/icmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ use core::mem;

use aya_bpf::{bindings::TC_ACT_PIPE, helpers::bpf_csum_diff, programs::TcContext};
use aya_log_ebpf::info;
use common::ClientKey;
use network_types::{eth::EthHdr, icmp::IcmpHdr, ip::Ipv4Hdr};

use crate::{
utils::{csum_fold_helper, ptr_at},
BLIXT_CONNTRACK,
LB_CONNECTIONS,
};

const ICMP_PROTO_TYPE_UNREACH: u8 = 3;
Expand All @@ -31,18 +32,21 @@ pub fn handle_icmp_egress(ctx: TcContext) -> Result<i32, i64> {
}

let dest_addr = unsafe { (*ip_hdr).dst_addr };

let new_src = unsafe { BLIXT_CONNTRACK.get(&dest_addr) }.ok_or(TC_ACT_PIPE)?;
let client_key = &ClientKey {
ip: dest_addr.to_be(),
port: 0,
};
let lb_mapping = unsafe { LB_CONNECTIONS.get(client_key) }.ok_or(TC_ACT_PIPE)?;

info!(
&ctx,
"Received a ICMP Unreachable packet destined for svc ip: {:i} ",
u32::from_be(unsafe { (*ip_hdr).dst_addr })
u32::from_be(dest_addr)
);

// redirect icmp unreachable message back to client
unsafe {
(*ip_hdr).src_addr = new_src.0;
(*ip_hdr).src_addr = lb_mapping.backend_key.ip.to_be();
(*ip_hdr).check = 0;
}

Expand All @@ -62,7 +66,7 @@ pub fn handle_icmp_egress(ctx: TcContext) -> Result<i32, i64> {
unsafe { ptr_at(&ctx, icmp_header_offset + IcmpHdr::LEN) }?;

unsafe {
(*icmp_inner_ip_hdr).dst_addr = new_src.0;
(*icmp_inner_ip_hdr).dst_addr = lb_mapping.backend_key.ip.to_be();
(*icmp_inner_ip_hdr).check = 0;
}

Expand All @@ -77,7 +81,7 @@ pub fn handle_icmp_egress(ctx: TcContext) -> Result<i32, i64> {
} as u64;
unsafe { (*icmp_inner_ip_hdr).check = csum_fold_helper(full_cksum) };

unsafe { BLIXT_CONNTRACK.remove(&dest_addr)? };
unsafe { LB_CONNECTIONS.remove(client_key)? };

return Ok(TC_ACT_PIPE);
}
18 changes: 9 additions & 9 deletions dataplane/ebpf/src/egress/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use network_types::{eth::EthHdr, ip::Ipv4Hdr, tcp::TcpHdr};

use crate::{
utils::{csum_fold_helper, ptr_at, update_tcp_conns},
TCP_CONNECTIONS,
LB_CONNECTIONS,
};

pub fn handle_tcp_egress(ctx: TcContext) -> Result<i32, i64> {
Expand All @@ -36,24 +36,24 @@ pub fn handle_tcp_egress(ctx: TcContext) -> Result<i32, i64> {
ip: u32::from_be(client_addr),
port: u16::from_be(dest_port) as u32,
};
let tcp_backend = unsafe { TCP_CONNECTIONS.get(&client_key) }.ok_or(TC_ACT_PIPE)?;
let lb_mapping = unsafe { LB_CONNECTIONS.get(&client_key) }.ok_or(TC_ACT_PIPE)?;

info!(
&ctx,
"Received TCP packet destined for tracked IP {:i}:{} setting source IP to VIP {:i}:{}",
u32::from_be(client_addr),
u16::from_be(dest_port),
tcp_backend.backend_key.ip,
tcp_backend.backend_key.port,
lb_mapping.backend_key.ip,
lb_mapping.backend_key.port,
);

// TODO: connection tracking cleanup https://github.com/kubernetes-sigs/blixt/issues/85
// SNAT the ip address
unsafe {
(*ip_hdr).src_addr = tcp_backend.backend_key.ip.to_be();
(*ip_hdr).src_addr = lb_mapping.backend_key.ip.to_be();
};
// SNAT the port
unsafe { (*tcp_hdr).source = u16::from_be(tcp_backend.backend_key.port as u16) };
unsafe { (*tcp_hdr).source = u16::from_be(lb_mapping.backend_key.port as u16) };

if (ctx.data() + EthHdr::LEN + Ipv4Hdr::LEN) > ctx.data_end() {
info!(&ctx, "Iphdr is out of bounds");
Expand All @@ -79,12 +79,12 @@ pub fn handle_tcp_egress(ctx: TcContext) -> Result<i32, i64> {
// from our map.
if tcp_hdr_ref.rst() == 1 {
unsafe {
TCP_CONNECTIONS.remove(&client_key)?;
LB_CONNECTIONS.remove(&client_key)?;
}
}

let mut tcp_bk = *tcp_backend;
update_tcp_conns(tcp_hdr_ref, &client_key, &mut tcp_bk)?;
let mut mapping = *lb_mapping;
update_tcp_conns(tcp_hdr_ref, &client_key, &mut mapping)?;

Ok(TC_ACT_PIPE)
}
29 changes: 19 additions & 10 deletions dataplane/ebpf/src/ingress/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ use network_types::{eth::EthHdr, ip::Ipv4Hdr, tcp::TcpHdr};

use crate::{
utils::{csum_fold_helper, ptr_at, update_tcp_conns},
BACKENDS, GATEWAY_INDEXES, TCP_CONNECTIONS,
BACKENDS, GATEWAY_INDEXES, LB_CONNECTIONS,
};
use common::{
Backend, BackendKey, ClientKey, LoadBalancerMapping, TCPState, BACKENDS_ARRAY_CAPACITY,
};
use common::{Backend, BackendKey, ClientKey, TCPBackend, TCPState, BACKENDS_ARRAY_CAPACITY};

pub fn handle_tcp_ingress(ctx: TcContext) -> Result<i32, i64> {
let ip_hdr: *mut Ipv4Hdr = unsafe { ptr_at(&ctx, EthHdr::LEN)? };
Expand All @@ -41,14 +43,14 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result<i32, i64> {
// Flag to check whether this is a new connection.
let mut new_conn = false;
// The state of this TCP connection.
let mut tcp_state = TCPState::default();
let mut tcp_state = Some(TCPState::default());

// Try to find the backend previously used for this connection. If not found, it means that
// this is a new connection, so assign it the next backend in line.
if let Some(val) = unsafe { TCP_CONNECTIONS.get(&client_key) } {
if let Some(val) = unsafe { LB_CONNECTIONS.get(&client_key) } {
backend = val.backend;
tcp_state = val.state;
backend_key = val.backend_key;
tcp_state = val.tcp_state;
} else {
new_conn = true;

Expand Down Expand Up @@ -76,6 +78,13 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result<i32, i64> {
backend = backend_list.backends[0];
if let Some(val) = backend_list.backends.get(*backend_index as usize) {
backend = *val;
} else {
debug!(
&ctx,
"Failed to find backend in backends_list at index {}, falling back to 0th index; backends_len: {} ",
*backend_index,
backend_list.backends_len
)
}

// move the index to the next backend in our list
Expand Down Expand Up @@ -132,16 +141,16 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result<i32, i64> {
)
};

let mut tcp_backend = TCPBackend {
let mut lb_mapping = LoadBalancerMapping {
backend,
backend_key,
state: tcp_state,
tcp_state,
};

// If the connection is new, then record it in our map for future tracking.
if new_conn {
unsafe {
TCP_CONNECTIONS.insert(&client_key, &tcp_backend, 0_u64)?;
LB_CONNECTIONS.insert(&client_key, &lb_mapping, 0_u64)?;
}

// since this is a new connection, there is nothing else to do, so exit early
Expand All @@ -155,11 +164,11 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result<i32, i64> {
// from our map.
if tcp_hdr_ref.rst() == 1 {
unsafe {
TCP_CONNECTIONS.remove(&client_key)?;
LB_CONNECTIONS.remove(&client_key)?;
}
}

update_tcp_conns(tcp_hdr_ref, &client_key, &mut tcp_backend)?;
update_tcp_conns(tcp_hdr_ref, &client_key, &mut lb_mapping)?;

info!(&ctx, "redirect action: {}", action);
Ok(action as i32)
Expand Down
Loading

0 comments on commit 505becf

Please sign in to comment.