Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sources): multicast udp socket support #22099

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a8563a3
feat: multicast udp socket support
jorgehermo9 Dec 31, 2024
ba9015f
Merge branch 'master' into feature/multicast-udp-source
jorgehermo9 Jan 18, 2025
1ab728f
test: add tests for multicast udp
jorgehermo9 Jan 19, 2025
b1e6c92
test: add tests for multicast udp
jorgehermo9 Jan 19, 2025
e63e2bf
chore: add changelog
jorgehermo9 Jan 19, 2025
0e32685
chore: remove dbg statement
jorgehermo9 Jan 19, 2025
adad168
chore: remove todo
jorgehermo9 Jan 19, 2025
72131fb
chore: remove todo
jorgehermo9 Jan 19, 2025
12bf074
chore: fix typo
jorgehermo9 Jan 19, 2025
3018327
docs: add docs for multicast_groups setting
jorgehermo9 Jan 19, 2025
588bb09
docs: update website cue
jorgehermo9 Jan 19, 2025
c140d42
fix: clippy lints
jorgehermo9 Jan 19, 2025
58223d5
chore: fix typo
jorgehermo9 Jan 19, 2025
6b96901
chore: fix typo
jorgehermo9 Jan 19, 2025
9d25576
Update src/sources/socket/mod.rs
jorgehermo9 Feb 5, 2025
5b7723c
feat: change panic to unimplemented
jorgehermo9 Feb 5, 2025
e9a5933
chore: add todo about init_udp_with_config
jorgehermo9 Feb 5, 2025
058f841
chore: add todo about init_udp_with_config
jorgehermo9 Feb 5, 2025
1ba5dd8
feat: change "addr" to "to" in udp test helpers
jorgehermo9 Feb 5, 2025
dc09eb5
feat: add new SocketMulticastGroupJoinError and refactor error test
jorgehermo9 Feb 5, 2025
4fb4e28
feat: add variables to logs
jorgehermo9 Feb 5, 2025
a2930d2
feat: add variables to logs
jorgehermo9 Feb 5, 2025
c03bcfb
feat: add INITIALIZING error stage
jorgehermo9 Mar 1, 2025
cdd37cd
Merge remote-tracking branch 'origin' into feature/multicast-udp-source
pront Mar 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions changelog.d/5732_multicast_udp_socket_sources.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
The `socket` source with `udp` mode now supports joining multicast groups via the `multicast_groups` option
of that source. This allows the source to receive multicast packets from the specified multicast groups.

Note that in order to work propertly, the `socket` address must be set to `0.0.0.0` and not

Check warning on line 4 in changelog.d/5732_multicast_udp_socket_sources.feature.md

View workflow job for this annotation

GitHub Actions / Check Spelling

`propertly` is not a recognized word. (unrecognized-spelling)
to `127.0.0.1` (localhost) or any other specific IP address. If other IP address is used, the host's interface
will filter out the multicast packets as the packet target IP (multicast) would not match the host's interface IP.

authors: jorgehermo9
27 changes: 26 additions & 1 deletion lib/vector-config/src/stdlib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
cell::RefCell,
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
hash::Hash,
net::SocketAddr,
net::{Ipv4Addr, SocketAddr},
num::{
NonZeroI16, NonZeroI32, NonZeroI64, NonZeroI8, NonZeroU16, NonZeroU32, NonZeroU64,
NonZeroU8, NonZeroUsize,
Expand Down Expand Up @@ -402,6 +402,31 @@ impl ToValue for SocketAddr {
}
}

impl Configurable for Ipv4Addr {
fn referenceable_name() -> Option<&'static str> {
Some("stdlib::Ipv4Addr")
}

fn metadata() -> Metadata {
let mut metadata = Metadata::default();
metadata.set_description("An IPv4 address.");
metadata
}

fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
// TODO: We don't need anything other than a string schema to (de)serialize a `Ipv4Addr`,
// but we eventually should have validation since the format for the possible permutations
// is well-known and can be easily codified.
Ok(generate_string_schema())
}
}

impl ToValue for Ipv4Addr {
fn to_value(&self) -> Value {
Value::String(self.to_string())
}
}

impl Configurable for PathBuf {
fn referenceable_name() -> Option<&'static str> {
Some("stdlib::PathBuf")
Expand Down
138 changes: 132 additions & 6 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ mod test {
use approx::assert_relative_eq;
use std::{
collections::HashMap,
net::{SocketAddr, UdpSocket},
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Expand Down Expand Up @@ -374,7 +374,7 @@ mod test {
test_util::{
collect_n, collect_n_limited,
components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
next_addr, random_string, send_lines, send_lines_tls, wait_for_tcp,
next_addr, next_addr_any, random_string, send_lines, send_lines_tls, wait_for_tcp,
},
tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig},
SourceSender,
Expand Down Expand Up @@ -899,12 +899,27 @@ mod test {

//////// UDP TESTS ////////
fn send_lines_udp(addr: SocketAddr, lines: impl IntoIterator<Item = String>) -> SocketAddr {
send_packets_udp(addr, lines.into_iter().map(|line| line.into()))
send_lines_udp_from(next_addr(), addr, lines)
}

fn send_lines_udp_from(
from: SocketAddr,
addr: SocketAddr,
lines: impl IntoIterator<Item = String>,
) -> SocketAddr {
send_packets_udp_from(from, addr, lines.into_iter().map(|line| line.into()))
}

fn send_packets_udp(addr: SocketAddr, packets: impl IntoIterator<Item = Bytes>) -> SocketAddr {
let bind = next_addr();
let socket = UdpSocket::bind(bind)
send_packets_udp_from(next_addr(), addr, packets)
}

fn send_packets_udp_from(
from: SocketAddr,
addr: SocketAddr,
packets: impl IntoIterator<Item = Bytes>,
) -> SocketAddr {
let socket = UdpSocket::bind(from)
.map_err(|error| panic!("{:}", error))
.ok()
.unwrap();
Expand All @@ -926,7 +941,7 @@ mod test {
thread::sleep(Duration::from_millis(10));

// Done
bind
from
}

async fn init_udp_with_shutdown(
Expand Down Expand Up @@ -1303,6 +1318,117 @@ mod test {
.await;
}

#[tokio::test]
async fn multicast_udp_message() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
// The socket address must be `IPADDR_ANY` (0.0.0.0) in order to receive multicast packets
let socket_address = next_addr_any();
let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
let multicast_socket_address =
SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = vec![multicast_ip_address];
init_udp_with_config(tx, config).await;

// We must send packets to the same interface the `socket_address` is bound to
// in order to receive the multicast packets this `from` socket sends
// To do so, we use the `IPADDR_ANY` address
let from = next_addr_any();
send_lines_udp_from(from, multicast_socket_address, ["test".to_string()]);

let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
"test".into()
);
})
.await;
}

#[tokio::test]
async fn multiple_multicast_addresses_udp_message() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let socket_address = next_addr_any();
let multicast_ip_addresses = (2..12)
.map(|i| {
let ip_address = format!("224.0.0.{i}").parse().unwrap();
ip_address
})
.collect::<Vec<Ipv4Addr>>();
let multicast_ip_socket_addresses = multicast_ip_addresses
.iter()
.map(|ip_address| SocketAddr::new(IpAddr::V4(*ip_address), socket_address.port()))
.collect::<Vec<SocketAddr>>();
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = multicast_ip_addresses;
init_udp_with_config(tx, config).await;

let from = next_addr_any();
for multicast_ip_socket_address in multicast_ip_socket_addresses {
send_lines_udp_from(
from,
multicast_ip_socket_address,
[multicast_ip_socket_address.to_string()],
);

let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
multicast_ip_socket_address.to_string().into()
);
}
})
.await;
}

#[tokio::test]
async fn multicast_and_unicast_udp_message() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let socket_address = next_addr_any();
let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
let multicast_socket_address =
SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = vec![multicast_ip_address];
init_udp_with_config(tx, config).await;

let from = next_addr_any();
// Send packet to multicast address
send_lines_udp_from(from, multicast_socket_address, ["test".to_string()]);
let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
"test".into()
);

// Send packet to unicast address
send_lines_udp_from(from, socket_address, ["test".to_string()]);
let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
"test".into()
);
})
.await;
}

#[tokio::test]
#[should_panic]
async fn udp_invalid_multicast_group() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, _rx) = SourceSender::new_test();
let socket_address = next_addr_any();
let invalid_multicast_ip_address: Ipv4Addr = "192.168.0.3".parse().unwrap();
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = vec![invalid_multicast_ip_address];
init_udp_with_config(tx, config).await;
})
.await;
}

////////////// UNIX TEST LIBS //////////////

#[cfg(unix)]
Expand Down
51 changes: 50 additions & 1 deletion src/sources/socket/udp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::{Ipv4Addr, SocketAddr};

use super::default_host_key;
use bytes::BytesMut;
use chrono::Utc;
Expand Down Expand Up @@ -41,6 +43,17 @@ pub struct UdpConfig {
#[configurable(derived)]
address: SocketListenAddr,

/// TODO: document this.
/// TODO: The join multicast method should fail if the address is not SocketListenAddr::SocketAddr.
/// multicast wont work with systemd{N} fd sockets.
/// Also, if the address is IPv4, the multicast address should be IPv4 too.
/// TODO: should we support a list of groups or a single group? The `join_multicast` method supports
/// just one group per call.
/// TODO: document that we use `IPv4Addr` and not `SocketAddr` for the multicast groups because
/// the `join_multicast_v6` is not supported due to the need of using an interface index.s
#[serde(default)]
pub(super) multicast_groups: Vec<Ipv4Addr>,

/// The maximum buffer size of incoming messages.
///
/// Messages larger than this are truncated.
Expand Down Expand Up @@ -118,6 +131,7 @@ impl UdpConfig {
pub fn from_address(address: SocketListenAddr) -> Self {
Self {
address,
multicast_groups: Vec::new(),
max_length: default_max_length(),
host_key: None,
port_key: default_port_key(),
Expand Down Expand Up @@ -152,6 +166,41 @@ pub(super) fn udp(
})
})?;

if !config.multicast_groups.is_empty() {
socket.set_multicast_loop_v4(true).unwrap();
let listen_addr = match config.address() {
SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr,
SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => {
// We could support Ipv6 multicast with the
// https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v6 method
// and specifying the interface index as `0`, in order to bind all interfaces.
panic!("IPv6 multicast is not supported")
}
// TODO: if we need to support systemd{N} fd sockets, we should use the
// `UdpSocket::local_addr` method to get the address of the socket.
// that method can fail and I wonder if the user sets `IP_ADDR_ANY` (`0.0.0.0`) in the config,
// the `UdpSocket::local_addr` would return the real interface address that the
// socket is bound to, and not `IP_ADDR_ANY`. We need to use the same address
// for the multicast group join that the user has set in the config.
// if systemd{N} fd sockets are required to work too, we should investigate on this.
SocketListenAddr::SystemdFd(_) => {
panic!("Multicast for systemd fd sockets is not supported")
}
};
for group_addr in config.multicast_groups {
socket
.join_multicast_v4(group_addr, *listen_addr.ip())
.map_err(|error| {
// TODO: is this considered a `SocketBindError`? or should we create a new error for this case?
emit!(SocketBindError {
mode: SocketMode::Udp,
error,
})
})?;
info!(message = "Joined multicast group.", group = %group_addr);
}
}

if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) {
warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
Expand All @@ -173,7 +222,7 @@ pub(super) fn udp(
buf.resize(max_length + 1, 0);
tokio::select! {
recv = socket.recv_from(&mut buf) => {
let (byte_size, address) = match recv {
let (byte_size, address) = match dbg!(recv) {
Ok(res) => res,
Err(error) => {
#[cfg(windows)]
Expand Down
4 changes: 4 additions & 0 deletions src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ pub fn next_addr() -> SocketAddr {
next_addr_for_ip(IpAddr::V4(Ipv4Addr::LOCALHOST))
}

pub fn next_addr_any() -> SocketAddr {
next_addr_for_ip(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
}

pub fn next_addr_v6() -> SocketAddr {
next_addr_for_ip(IpAddr::V6(Ipv6Addr::LOCALHOST))
}
Expand Down
Loading