Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sources): multicast udp socket support #22099

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a8563a3
feat: multicast udp socket support
jorgehermo9 Dec 31, 2024
ba9015f
Merge branch 'master' into feature/multicast-udp-source
jorgehermo9 Jan 18, 2025
1ab728f
test: add tests for multicast udp
jorgehermo9 Jan 19, 2025
b1e6c92
test: add tests for multicast udp
jorgehermo9 Jan 19, 2025
e63e2bf
chore: add changelog
jorgehermo9 Jan 19, 2025
0e32685
chore: remove dbg statement
jorgehermo9 Jan 19, 2025
adad168
chore: remove todo
jorgehermo9 Jan 19, 2025
72131fb
chore: remove todo
jorgehermo9 Jan 19, 2025
12bf074
chore: fix typo
jorgehermo9 Jan 19, 2025
3018327
docs: add docs for multicast_groups setting
jorgehermo9 Jan 19, 2025
588bb09
docs: update website cue
jorgehermo9 Jan 19, 2025
c140d42
fix: clippy lints
jorgehermo9 Jan 19, 2025
58223d5
chore: fix typo
jorgehermo9 Jan 19, 2025
6b96901
chore: fix typo
jorgehermo9 Jan 19, 2025
9d25576
Update src/sources/socket/mod.rs
jorgehermo9 Feb 5, 2025
5b7723c
feat: change panic to unimplemented
jorgehermo9 Feb 5, 2025
e9a5933
chore: add todo about init_udp_with_config
jorgehermo9 Feb 5, 2025
058f841
chore: add todo about init_udp_with_config
jorgehermo9 Feb 5, 2025
1ba5dd8
feat: change "addr" to "to" in udp test helpers
jorgehermo9 Feb 5, 2025
dc09eb5
feat: add new SocketMulticastGroupJoinError and refactor error test
jorgehermo9 Feb 5, 2025
4fb4e28
feat: add variables to logs
jorgehermo9 Feb 5, 2025
a2930d2
feat: add variables to logs
jorgehermo9 Feb 5, 2025
c03bcfb
feat: add INITIALIZING error stage
jorgehermo9 Mar 1, 2025
cdd37cd
Merge remote-tracking branch 'origin' into feature/multicast-udp-source
pront Mar 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions changelog.d/5732_multicast_udp_socket_sources.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
The `socket` source with `udp` mode now supports joining multicast groups via the `multicast_groups` option
of that source. This allows the source to receive multicast packets from the specified multicast groups.

Note that in order to work properly, the `socket` address must be set to `0.0.0.0` and not
to `127.0.0.1` (localhost) or any other specific IP address. If other IP address is used, the host's interface
will filter out the multicast packets as the packet target IP (multicast) would not match the host's interface IP.

authors: jorgehermo9
27 changes: 26 additions & 1 deletion lib/vector-config/src/stdlib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
cell::RefCell,
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
hash::Hash,
net::SocketAddr,
net::{Ipv4Addr, SocketAddr},
num::{
NonZeroI16, NonZeroI32, NonZeroI64, NonZeroI8, NonZeroU16, NonZeroU32, NonZeroU64,
NonZeroU8, NonZeroUsize,
Expand Down Expand Up @@ -402,6 +402,31 @@ impl ToValue for SocketAddr {
}
}

impl Configurable for Ipv4Addr {
fn referenceable_name() -> Option<&'static str> {
Some("stdlib::Ipv4Addr")
}

fn metadata() -> Metadata {
let mut metadata = Metadata::default();
metadata.set_description("An IPv4 address.");
metadata
}

fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
// TODO: We don't need anything other than a string schema to (de)serialize a `Ipv4Addr`,
// but we eventually should have validation since the format for the possible permutations
// is well-known and can be easily codified.
Ok(generate_string_schema())
}
}

impl ToValue for Ipv4Addr {
fn to_value(&self) -> Value {
Value::String(self.to_string())
}
}

impl Configurable for PathBuf {
fn referenceable_name() -> Option<&'static str> {
Some("stdlib::PathBuf")
Expand Down
33 changes: 33 additions & 0 deletions src/internal_events/socket.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::Ipv4Addr;

use metrics::{counter, histogram};
use vector_lib::internal_event::{ComponentEventsDropped, InternalEvent, UNINTENTIONAL};
use vector_lib::{
Expand Down Expand Up @@ -135,6 +137,37 @@ impl<E: std::fmt::Display> InternalEvent for SocketBindError<E> {
}
}

#[derive(Debug)]
pub struct SocketMulticastGroupJoinError<E> {
pub error: E,
pub group_addr: Ipv4Addr,
pub interface: Ipv4Addr,
}

impl<E: std::fmt::Display> InternalEvent for SocketMulticastGroupJoinError<E> {
fn emit(self) {
// Multicast groups are only used in UDP mode
let mode = SocketMode::Udp.as_str();
error!(
message = "Error joining multicast group.",
error = %self.error,
error_code = "socket_multicast_group_join",
error_type = error_type::IO_FAILED,
stage = error_stage::RECEIVING,
%mode,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
"error_code" => "socket_multicast_group_join",
"error_type" => error_type::IO_FAILED,
"stage" => error_stage::RECEIVING,
"mode" => mode,
)
.increment(1);
}
}

#[derive(Debug)]
pub struct SocketReceiveError<E> {
pub mode: SocketMode,
Expand Down
145 changes: 135 additions & 10 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ mod test {
use approx::assert_relative_eq;
use std::{
collections::HashMap,
net::{SocketAddr, UdpSocket},
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Expand Down Expand Up @@ -373,8 +373,11 @@ mod test {
sources::util::net::SocketListenAddr,
test_util::{
collect_n, collect_n_limited,
components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
next_addr, random_string, send_lines, send_lines_tls, wait_for_tcp,
components::{
assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS,
SOCKET_PUSH_SOURCE_TAGS,
},
next_addr, next_addr_any, random_string, send_lines, send_lines_tls, wait_for_tcp,
},
tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig},
SourceSender,
Expand Down Expand Up @@ -898,21 +901,36 @@ mod test {
}

//////// UDP TESTS ////////
fn send_lines_udp(addr: SocketAddr, lines: impl IntoIterator<Item = String>) -> SocketAddr {
send_packets_udp(addr, lines.into_iter().map(|line| line.into()))
fn send_lines_udp(to: SocketAddr, lines: impl IntoIterator<Item = String>) -> SocketAddr {
send_lines_udp_from(next_addr(), to, lines)
}

fn send_lines_udp_from(
from: SocketAddr,
to: SocketAddr,
lines: impl IntoIterator<Item = String>,
) -> SocketAddr {
send_packets_udp_from(from, to, lines.into_iter().map(|line| line.into()))
}

fn send_packets_udp(addr: SocketAddr, packets: impl IntoIterator<Item = Bytes>) -> SocketAddr {
let bind = next_addr();
let socket = UdpSocket::bind(bind)
fn send_packets_udp(to: SocketAddr, packets: impl IntoIterator<Item = Bytes>) -> SocketAddr {
send_packets_udp_from(next_addr(), to, packets)
}

fn send_packets_udp_from(
from: SocketAddr,
to: SocketAddr,
packets: impl IntoIterator<Item = Bytes>,
) -> SocketAddr {
let socket = UdpSocket::bind(from)
.map_err(|error| panic!("{:}", error))
.ok()
.unwrap();

for packet in packets {
assert_eq!(
socket
.send_to(&packet, addr)
.send_to(&packet, to)
.map_err(|error| panic!("{:}", error))
.ok()
.unwrap(),
Expand All @@ -926,7 +944,7 @@ mod test {
thread::sleep(Duration::from_millis(10));

// Done
bind
from
}

async fn init_udp_with_shutdown(
Expand Down Expand Up @@ -1303,6 +1321,113 @@ mod test {
.await;
}

#[tokio::test]
async fn multicast_udp_message() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
// The socket address must be `IPADDR_ANY` (0.0.0.0) in order to receive multicast packets
let socket_address = next_addr_any();
let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
let multicast_socket_address =
SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = vec![multicast_ip_address];
init_udp_with_config(tx, config).await;

// We must send packets to the same interface the `socket_address` is bound to
// in order to receive the multicast packets the `from` socket sends.
// To do so, we use the `IPADDR_ANY` address
let from = next_addr_any();
send_lines_udp_from(from, multicast_socket_address, ["test".to_string()]);

let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
"test".into()
);
})
.await;
}

#[tokio::test]
async fn multiple_multicast_addresses_udp_message() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let socket_address = next_addr_any();
let multicast_ip_addresses = (2..12)
.map(|i| format!("224.0.0.{i}").parse().unwrap())
.collect::<Vec<Ipv4Addr>>();
let multicast_ip_socket_addresses = multicast_ip_addresses
.iter()
.map(|ip_address| SocketAddr::new(IpAddr::V4(*ip_address), socket_address.port()))
.collect::<Vec<SocketAddr>>();
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = multicast_ip_addresses;
init_udp_with_config(tx, config).await;

let from = next_addr_any();
for multicast_ip_socket_address in multicast_ip_socket_addresses {
send_lines_udp_from(
from,
multicast_ip_socket_address,
[multicast_ip_socket_address.to_string()],
);

let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
multicast_ip_socket_address.to_string().into()
);
}
})
.await;
}

#[tokio::test]
async fn multicast_and_unicast_udp_message() {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let socket_address = next_addr_any();
let multicast_ip_address: Ipv4Addr = "224.0.0.2".parse().unwrap();
let multicast_socket_address =
SocketAddr::new(IpAddr::V4(multicast_ip_address), socket_address.port());
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = vec![multicast_ip_address];
init_udp_with_config(tx, config).await;

let from = next_addr_any();
// Send packet to multicast address
send_lines_udp_from(from, multicast_socket_address, ["test".to_string()]);
let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
"test".into()
);

// Send packet to unicast address
send_lines_udp_from(from, socket_address, ["test".to_string()]);
let event = rx.next().await.expect("must receive an event");
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
"test".into()
);
})
.await;
}

#[tokio::test]
async fn udp_invalid_multicast_group() {
assert_source_error(&COMPONENT_ERROR_TAGS, async {
let (tx, _rx) = SourceSender::new_test();
let socket_address = next_addr_any();
let invalid_multicast_ip_address: Ipv4Addr = "192.168.0.3".parse().unwrap();
let mut config = UdpConfig::from_address(socket_address.into());
config.multicast_groups = vec![invalid_multicast_ip_address];
init_udp_with_config(tx, config).await;
})
.await;
}

////////////// UNIX TEST LIBS //////////////

#[cfg(unix)]
Expand Down
50 changes: 49 additions & 1 deletion src/sources/socket/udp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::{Ipv4Addr, SocketAddr};

use super::default_host_key;
use bytes::BytesMut;
use chrono::Utc;
Expand All @@ -20,7 +22,8 @@ use crate::{
codecs::Decoder,
event::Event,
internal_events::{
SocketBindError, SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError,
SocketBindError, SocketEventsReceived, SocketMode, SocketMulticastGroupJoinError,
SocketReceiveError, StreamClosedError,
},
net,
serde::default_decoding,
Expand All @@ -41,6 +44,21 @@ pub struct UdpConfig {
#[configurable(derived)]
address: SocketListenAddr,

/// List of IPv4 multicast groups to join on socket's binding process.
///
/// In order to read multicast packets, this source's listening address should be set to `0.0.0.0`.
/// If any other address is used (such as `127.0.0.1` or an specific interface address), the
/// listening interface will filter out all multicast packets received,
/// as their target IP would be the one of the multicast group
/// and it will not match the socket's bound IP.
///
/// Note that this setting will only work if the source's address
/// is an IPv4 address (IPv6 and systemd file descriptor as source's address are not supported
/// with multicast groups).
#[serde(default)]
#[configurable(metadata(docs::examples = "['224.0.0.2', '224.0.0.4']"))]
pub(super) multicast_groups: Vec<Ipv4Addr>,

/// The maximum buffer size of incoming messages.
///
/// Messages larger than this are truncated.
Expand Down Expand Up @@ -118,6 +136,7 @@ impl UdpConfig {
pub fn from_address(address: SocketListenAddr) -> Self {
Self {
address,
multicast_groups: Vec::new(),
max_length: default_max_length(),
host_key: None,
port_key: default_port_key(),
Expand Down Expand Up @@ -152,6 +171,35 @@ pub(super) fn udp(
})
})?;

if !config.multicast_groups.is_empty() {
socket.set_multicast_loop_v4(true).unwrap();
let listen_addr = match config.address() {
SocketListenAddr::SocketAddr(SocketAddr::V4(addr)) => addr,
SocketListenAddr::SocketAddr(SocketAddr::V6(_)) => {
// We could support Ipv6 multicast with the
// https://doc.rust-lang.org/std/net/struct.UdpSocket.html#method.join_multicast_v6 method
// and specifying the interface index as `0`, in order to bind all interfaces.
unimplemented!("IPv6 multicast is not supported")
}
SocketListenAddr::SystemdFd(_) => {
unimplemented!("Multicast for systemd fd sockets is not supported")
}
};
for group_addr in config.multicast_groups {
let interface = *listen_addr.ip();
socket
.join_multicast_v4(group_addr, interface)
.map_err(|error| {
emit!(SocketMulticastGroupJoinError {
error,
group_addr,
interface,
})
})?;
info!(message = "Joined multicast group.", group = %group_addr);
}
}

if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) {
warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
Expand Down
4 changes: 4 additions & 0 deletions src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ pub fn next_addr() -> SocketAddr {
next_addr_for_ip(IpAddr::V4(Ipv4Addr::LOCALHOST))
}

pub fn next_addr_any() -> SocketAddr {
next_addr_for_ip(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
}

pub fn next_addr_v6() -> SocketAddr {
next_addr_for_ip(IpAddr::V6(Ipv6Addr::LOCALHOST))
}
Expand Down
Loading