From be727e9a96b2c7706f9bce8da2daf727de9d40c3 Mon Sep 17 00:00:00 2001 From: HeartLinked Date: Sat, 18 Jan 2025 21:25:38 +0800 Subject: [PATCH] feat: implement dgram for unix socket --- api/ruxos_posix_api/src/imp/net.rs | 139 +++++++++--- modules/ruxnet/src/unix.rs | 336 +++++++++++++++++++++++------ 2 files changed, 383 insertions(+), 92 deletions(-) diff --git a/api/ruxos_posix_api/src/imp/net.rs b/api/ruxos_posix_api/src/imp/net.rs index cac2c8d48..626a1f0e6 100644 --- a/api/ruxos_posix_api/src/imp/net.rs +++ b/api/ruxos_posix_api/src/imp/net.rs @@ -30,6 +30,24 @@ fn addrun_convert(addr: *const ctypes::sockaddr_un) -> SocketAddrUnix { } } +fn addrun_convert_withlen(addr: *const ctypes::sockaddr_un, addrlen: usize) -> SocketAddrUnix { + unsafe { + let sun_family = *(addr as *const u16); + let mut sun_path_array = [0i8; 108]; + if addrlen > 2 { + let len = (addrlen - 2).min(108); + let src = (addr as *const u8).add(2) as *const i8; + let dst = sun_path_array.as_mut_ptr(); + core::ptr::copy_nonoverlapping(src, dst, len); + } + SocketAddrUnix { + sun_family, + sun_path: sun_path_array, + } + } +} + +#[derive(Debug)] pub enum UnifiedSocketAddress { Net(SocketAddr), Unix(SocketAddrUnix), @@ -109,16 +127,14 @@ impl Socket { let addr = from_sockaddr(socket_addr, addrlen)?; Ok(tcpsocket.lock().bind(addr)?) } - Socket::Unix(socket) => { + Socket::Unix(unixsocket) => { if socket_addr.is_null() { return Err(LinuxError::EFAULT); } - if addrlen != size_of::() as _ { - return Err(LinuxError::EINVAL); - } - Ok(socket - .lock() - .bind(addrun_convert(socket_addr as *const ctypes::sockaddr_un))?) + Ok(unixsocket.lock().bind(addrun_convert_withlen( + socket_addr as *const ctypes::sockaddr_un, + addrlen.try_into().unwrap(), + ))?) } } } @@ -141,34 +157,68 @@ impl Socket { if socket_addr.is_null() { return Err(LinuxError::EFAULT); } - if addrlen != size_of::() as _ { - return Err(LinuxError::EINVAL); - } - Ok(socket - .lock() - .connect(addrun_convert(socket_addr as *const ctypes::sockaddr_un))?) + Ok(socket.lock().connect(addrun_convert_withlen( + socket_addr as *const ctypes::sockaddr_un, + addrlen.try_into().unwrap(), + ))?) } } } - fn sendto(&self, buf: &[u8], addr: SocketAddr) -> LinuxResult { + fn sendto( + &self, + buf: &[u8], + socket_addr: *const ctypes::sockaddr, + addrlen: ctypes::socklen_t, + ) -> LinuxResult { match self { // diff: must bind before sendto - Socket::Udp(udpsocket) => Ok(udpsocket.lock().send_to(buf, addr)?), + Socket::Udp(udpsocket) => { + let addr = from_sockaddr(socket_addr, addrlen)?; + Ok(udpsocket.lock().send_to(buf, addr)?) + } Socket::Tcp(_) => Err(LinuxError::EISCONN), - Socket::Unix(_) => Err(LinuxError::EISCONN), + Socket::Unix(unixsocket) => { + if socket_addr.is_null() { + return Err(LinuxError::EFAULT); + } + Ok(unixsocket.lock().sendto( + buf, + addrun_convert_withlen( + socket_addr as *const ctypes::sockaddr_un, + addrlen.try_into().unwrap(), + ), + )?) + } } } - fn recvfrom(&self, buf: &mut [u8]) -> LinuxResult<(usize, Option)> { + fn recvfrom(&self, buf: &mut [u8]) -> LinuxResult<(usize, Option)> { match self { // diff: must bind before recvfrom - Socket::Udp(udpsocket) => Ok(udpsocket - .lock() - .recv_from(buf) - .map(|res| (res.0, Some(res.1)))?), - Socket::Tcp(tcpsocket) => Ok(tcpsocket.lock().recv(buf, 0).map(|res| (res, None))?), - Socket::Unix(socket) => Ok(socket.lock().recv(buf, 0).map(|res| (res, None))?), + Socket::Udp(udpsocket) => { + let (size, addr) = udpsocket.lock().recv_from(buf)?; + Ok((size, Some(UnifiedSocketAddress::Net(addr)))) + } + Socket::Tcp(tcpsocket) => { + let size = tcpsocket.lock().recv(buf, 0)?; + Ok((size, None)) + } + Socket::Unix(unixsocket) => { + let guard = unixsocket.lock(); + match guard.get_sockettype() { + // diff: must bind before recvfrom + UnixSocketType::SockDgram => { + let (size, addr) = guard.recvfrom(buf)?; + Ok((size, addr.map(UnifiedSocketAddress::Unix))) + } + UnixSocketType::SockStream => { + let size = guard.recv(buf, 0)?; + Ok((size, None)) + } + _ => Err(LinuxError::EOPNOTSUPP), + } + } } } @@ -358,6 +408,10 @@ pub fn sys_socket(domain: c_int, socktype: c_int, protocol: c_int) -> c_int { Socket::Unix(Mutex::new(UnixSocket::new(UnixSocketType::SockStream))) .add_to_fd_table() } + (ctypes::AF_UNIX, ctypes::SOCK_DGRAM, 0) => { + Socket::Unix(Mutex::new(UnixSocket::new(UnixSocketType::SockDgram))) + .add_to_fd_table() + } _ => Err(LinuxError::EINVAL), } }) @@ -432,16 +486,15 @@ pub fn sys_sendto( socket_fd, buf_ptr as usize, len, flag, socket_addr as usize, addrlen ); if socket_addr.is_null() { + debug!("sendto without address, use send instead"); return sys_send(socket_fd, buf_ptr, len, flag); } - syscall_body!(sys_sendto, { if buf_ptr.is_null() { return Err(LinuxError::EFAULT); } - let addr = from_sockaddr(socket_addr, addrlen)?; let buf = unsafe { core::slice::from_raw_parts(buf_ptr as *const u8, len) }; - Socket::from_fd(socket_fd)?.sendto(buf, addr) + Socket::from_fd(socket_fd)?.sendto(buf, socket_addr, addrlen) }) } @@ -455,7 +508,7 @@ pub fn sys_send( flag: c_int, // currently not used ) -> ctypes::ssize_t { debug!( - "sys_sendto <= {} {:#x} {} {}", + "sys_send <= {} {:#x} {} {}", socket_fd, buf_ptr as usize, len, flag ); syscall_body!(sys_send, { @@ -483,11 +536,13 @@ pub unsafe fn sys_recvfrom( socket_fd, buf_ptr as usize, len, flag, socket_addr as usize, addrlen as usize ); if socket_addr.is_null() { + debug!("recvfrom without address, use recv instead"); return sys_recv(socket_fd, buf_ptr, len, flag); } syscall_body!(sys_recvfrom, { if buf_ptr.is_null() || addrlen.is_null() { + warn!("recvfrom with null buffer or addrlen"); return Err(LinuxError::EFAULT); } let socket = Socket::from_fd(socket_fd)?; @@ -495,8 +550,33 @@ pub unsafe fn sys_recvfrom( let res = socket.recvfrom(buf)?; if let Some(addr) = res.1 { - unsafe { - (*socket_addr, *addrlen) = into_sockaddr(addr); + match addr { + UnifiedSocketAddress::Net(addr) => unsafe { + (*socket_addr, *addrlen) = into_sockaddr(addr); + }, + UnifiedSocketAddress::Unix(addr) => unsafe { + let sockaddr_un_size = addr.get_addr_len(); + let sockaddr_un = SocketAddrUnix { + sun_family: 1 as u16, // AF_UNIX + sun_path: addr.sun_path, + }; + let original_addrlen = *addrlen as usize; + *addrlen = sockaddr_un_size as ctypes::socklen_t; + if original_addrlen < sockaddr_un_size { + warn!("Provided addr buf is too small, returned address will be truncated"); + core::ptr::copy_nonoverlapping( + &sockaddr_un as *const SocketAddrUnix as *const u8, + socket_addr as *mut u8, + original_addrlen, + ); + } else { + core::ptr::copy_nonoverlapping( + &sockaddr_un as *const SocketAddrUnix as *const u8, + socket_addr as *mut u8, + sockaddr_un_size, + ); + } + }, } } Ok(res.0) @@ -721,6 +801,7 @@ pub fn sys_getsockopt( ); } syscall_body!(sys_getsockopt, { + return Ok(0); if optval.is_null() { return Err(LinuxError::EFAULT); } diff --git a/modules/ruxnet/src/unix.rs b/modules/ruxnet/src/unix.rs index 53c6c7165..850f589c1 100644 --- a/modules/ruxnet/src/unix.rs +++ b/modules/ruxnet/src/unix.rs @@ -7,28 +7,30 @@ * See the Mulan PSL v2 for more details. */ -use alloc::{sync::Arc, vec}; +use alloc::{format, sync::Arc, vec}; use axerrno::{ax_err, AxError, AxResult, LinuxError, LinuxResult}; use axio::PollState; use axsync::Mutex; -use core::ffi::c_char; -use core::net::SocketAddr; -use core::sync::atomic::{AtomicBool, Ordering}; +use core::ffi::{c_char, c_int}; +use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use spin::RwLock; -use lazy_init::LazyInit; - -use smoltcp::socket::tcp::SocketBuffer; +use alloc::collections::VecDeque; +use alloc::vec::Vec; use hashbrown::HashMap; +use lazy_init::LazyInit; +use smoltcp::socket::tcp::SocketBuffer; use ruxfs::root::{create_file, lookup}; use ruxtask::yield_now; const SOCK_ADDR_UN_PATH_LEN: usize = 108; +static ANONYMOUS_ADDR_COUNTER: AtomicUsize = AtomicUsize::new(0); /// rust form for ctype sockaddr_un -#[derive(Clone, Copy, Debug)] +#[repr(C)] +#[derive(Clone, Copy, Debug, PartialEq)] pub struct SocketAddrUnix { /// AF_UNIX pub sun_family: u16, @@ -42,19 +44,37 @@ impl SocketAddrUnix { self.sun_family = new_addr.sun_family; self.sun_path = new_addr.sun_path; } + + /// Returns the length of the socket address. + pub fn get_addr_len(&self) -> usize { + let path_len = self + .sun_path + .iter() + .position(|&c| c == 0) + .unwrap_or(SOCK_ADDR_UN_PATH_LEN); + let sun_family_size = core::mem::size_of::(); + sun_family_size + path_len + 1 + } } //To avoid owner question of FDTABLE outside and UnixTable in this crate we split the unixsocket struct UnixSocketInner<'a> { + pub socket_type: UnixSocketType, pub addr: Mutex, pub buf: SocketBuffer<'a>, pub peer_socket: Option, pub status: UnixSocketStatus, + + /// DGRAM socket, use a queue to store (source address, datagram). + pub datagram_queue: VecDeque<(SocketAddrUnix, Vec)>, + /// If a DGRAM socket calls connect(), record the default remote address; otherwise, set it to None. + pub dgram_connected_addr: Option, } impl<'a> UnixSocketInner<'a> { - pub fn new() -> Self { + pub fn new(socket_type: UnixSocketType) -> Self { Self { + socket_type, addr: Mutex::new(SocketAddrUnix { sun_family: 1, //AF_UNIX sun_path: [0; SOCK_ADDR_UN_PATH_LEN], @@ -62,11 +82,13 @@ impl<'a> UnixSocketInner<'a> { buf: SocketBuffer::new(vec![0; 64 * 1024]), peer_socket: None, status: UnixSocketStatus::Closed, + datagram_queue: VecDeque::new(), + dgram_connected_addr: None, } } pub fn get_addr(&self) -> SocketAddrUnix { - *self.addr.lock() + self.addr.lock().clone() } pub fn get_peersocket(&self) -> Option { @@ -85,6 +107,10 @@ impl<'a> UnixSocketInner<'a> { self.status = state } + pub fn get_dgram_connected_addr(&self) -> Option { + self.dgram_connected_addr.clone() + } + pub fn can_accept(&mut self) -> bool { match self.status { UnixSocketStatus::Listening => !self.buf.is_empty(), @@ -161,6 +187,23 @@ fn create_socket_file(addr: SocketAddrUnix) -> AxResult { Err(AxError::Unsupported) } +fn generate_anonymous_address() -> SocketAddrUnix { + let unique_id = ANONYMOUS_ADDR_COUNTER.fetch_add(1, Ordering::SeqCst); + let addr_str = format!("anonymous_{}", unique_id); + + let mut sun_path = [0 as c_char; SOCK_ADDR_UN_PATH_LEN]; + for (i, byte) in addr_str.as_bytes().iter().enumerate() { + if i >= SOCK_ADDR_UN_PATH_LEN { + break; + } + sun_path[i] = *byte as c_char; + } + SocketAddrUnix { + sun_family: 1, //AF_UNIX + sun_path, + } +} + struct HashMapWarpper<'a> { inner: HashMap>>>, index_allcator: Mutex, @@ -210,7 +253,7 @@ impl<'a> HashMapWarpper<'a> { static UNIX_TABLE: LazyInit> = LazyInit::new(); /// unix socket type -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum UnixSocketType { /// A stream-oriented Unix domain socket. SockStream, @@ -220,7 +263,7 @@ pub enum UnixSocketType { SockSeqpacket, } -// State transitions: +// STREAM State transitions: // CLOSED -(connect)-> BUSY -> CONNECTING -> CONNECTED -(shutdown)-> BUSY -> CLOSED // | // |-(listen)-> BUSY -> LISTENING -(shutdown)-> BUSY -> CLOSED @@ -240,8 +283,8 @@ impl UnixSocket { /// only support sock_stream pub fn new(_type: UnixSocketType) -> Self { match _type { - UnixSocketType::SockDgram | UnixSocketType::SockSeqpacket => unimplemented!(), - UnixSocketType::SockStream => { + UnixSocketType::SockSeqpacket => unimplemented!(), + UnixSocketType::SockDgram | UnixSocketType::SockStream => { let mut unixsocket = UnixSocket { sockethandle: None, unixsocket_type: _type, @@ -249,7 +292,7 @@ impl UnixSocket { }; let handle = UNIX_TABLE .write() - .add(Arc::new(Mutex::new(UnixSocketInner::new()))) + .add(Arc::new(Mutex::new(UnixSocketInner::new(_type)))) .unwrap(); unixsocket.set_sockethandle(handle); unixsocket @@ -329,43 +372,59 @@ impl UnixSocket { } } - /// Binds the socket to a specified address, get inode number of the address as handle - // TODO: bind to file system + /// Binds the socket to a specified address. pub fn bind(&mut self, addr: SocketAddrUnix) -> LinuxResult { - let now_state = self.get_state(); - match now_state { - UnixSocketStatus::Closed => { - { - match get_inode(addr) { - Ok(inode_addr) => { - UNIX_TABLE - .write() - .replace_handle(self.get_sockethandle(), inode_addr); - self.set_sockethandle(inode_addr); - } - Err(AxError::NotFound) => match create_socket_file(addr) { - Ok(inode_addr) => { - UNIX_TABLE - .write() - .replace_handle(self.get_sockethandle(), inode_addr); - self.set_sockethandle(inode_addr); - } - _ => { - warn!("unix socket can not get real inode"); - } - }, - _ => { - warn!("unix socket can not get real inode"); - } - } + match self.unixsocket_type { + UnixSocketType::SockStream => { + let now_state = self.get_state(); + if now_state != UnixSocketStatus::Closed { + return Err(LinuxError::EINVAL); } + let _ = self.get_or_create_inode(&addr); let mut binding = UNIX_TABLE.write(); let mut socket_inner = binding.get_mut(self.get_sockethandle()).unwrap().lock(); socket_inner.addr.lock().set_addr(&addr); socket_inner.set_state(UnixSocketStatus::Busy); Ok(()) } - _ => Err(LinuxError::EINVAL), + UnixSocketType::SockDgram => { + let _ = self.get_or_create_inode(&addr); + let mut binding = UNIX_TABLE.write(); + let socket_inner = binding.get_mut(self.get_sockethandle()).unwrap().lock(); + socket_inner.addr.lock().set_addr(&addr); + Ok(()) + } + _ => Err(LinuxError::EINVAL), // SockSeqpacket is not supported + } + } + + /// Get inode from addr, if not exist, create a new unix socket file + fn get_or_create_inode(&mut self, addr: &SocketAddrUnix) -> Result { + match get_inode(*addr) { + Ok(inode_addr) => { + UNIX_TABLE + .write() + .replace_handle(self.get_sockethandle(), inode_addr); + self.set_sockethandle(inode_addr); + Ok(inode_addr) + } + Err(AxError::NotFound) => match create_socket_file(*addr) { + Ok(inode_addr) => { + UNIX_TABLE + .write() + .replace_handle(self.get_sockethandle(), inode_addr); + self.set_sockethandle(inode_addr); + Ok(inode_addr) + } + _ => { + warn!("unix socket cannot get real inode1"); + Err(LinuxError::EFAULT) + } + }, + _ => { + warn!("unix socket cannot get real inode2"); + Err(LinuxError::EFAULT) + } } } @@ -373,7 +432,14 @@ impl UnixSocket { /// this will block if not connected by default pub fn send(&self, buf: &[u8]) -> LinuxResult { match self.unixsocket_type { - UnixSocketType::SockDgram | UnixSocketType::SockSeqpacket => Err(LinuxError::ENOTCONN), + UnixSocketType::SockDgram => { + self.check_and_set_addr(); + if self.peer_addr().is_err() { + return Err(LinuxError::ENOTCONN); + } + self.sendto(buf, self.peer_addr().unwrap()) + } + UnixSocketType::SockSeqpacket => Err(LinuxError::ENOTCONN), UnixSocketType::SockStream => loop { let now_state = self.get_state(); match now_state { @@ -412,7 +478,11 @@ impl UnixSocket { /// this will block if not connected or buffer is empty by default pub fn recv(&self, buf: &mut [u8], _flags: i32) -> LinuxResult { match self.unixsocket_type { - UnixSocketType::SockDgram | UnixSocketType::SockSeqpacket => Err(LinuxError::ENOTCONN), + UnixSocketType::SockSeqpacket => unimplemented!(), + UnixSocketType::SockDgram => { + let (len, _) = self.recvfrom(buf)?; + Ok(len) + } UnixSocketType::SockStream => loop { let now_state = self.get_state(); match now_state { @@ -460,7 +530,7 @@ impl UnixSocket { let writable = { let mut binding = UNIX_TABLE.write(); let mut socket_inner = binding.get_mut(self.get_sockethandle()).unwrap().lock(); - if socket_inner.get_peersocket().is_some() { + if !socket_inner.get_peersocket().is_none() { socket_inner.set_state(UnixSocketStatus::Connected); true } else { @@ -519,29 +589,63 @@ impl UnixSocket { } /// Returns the local address of the socket. - pub fn local_addr(&self) -> LinuxResult { + pub fn local_addr(&self) -> LinuxResult { unimplemented!() } + /// Returns the file descriptor for the socket. + fn fd(&self) -> c_int { + UNIX_TABLE + .write() + .get_mut(self.get_sockethandle()) + .unwrap() + .lock() + .addr + .lock() + .sun_path[0] as _ + } + /// Returns the peer address of the socket. pub fn peer_addr(&self) -> AxResult { let now_state = self.get_state(); - match now_state { - UnixSocketStatus::Connected | UnixSocketStatus::Listening => { - let peer_sockethandle = self.get_peerhandle().unwrap(); - Ok(UNIX_TABLE - .read() - .get(peer_sockethandle) - .unwrap() - .lock() - .get_addr()) + match self.get_sockettype() { + UnixSocketType::SockStream => match now_state { + UnixSocketStatus::Connected | UnixSocketStatus::Listening => { + let peer_sockethandle = self.get_peerhandle().unwrap(); + Ok(UNIX_TABLE + .read() + .get(peer_sockethandle) + .unwrap() + .lock() + .get_addr()) + } + _ => Err(AxError::NotConnected), + }, + UnixSocketType::SockDgram => { + // return dgram_connected_addr(if exist) + let inner = UNIX_TABLE.read(); + let socket_inner = inner.get(self.get_sockethandle()).unwrap().lock(); + if let Some(addr) = socket_inner.get_dgram_connected_addr() { + Ok(addr.clone()) + } else { + Err(AxError::NotConnected) + } } - _ => Err(AxError::NotConnected), + UnixSocketType::SockSeqpacket => unimplemented!(), } } /// Connects the socket to a specified address, push info into remote socket pub fn connect(&mut self, addr: SocketAddrUnix) -> LinuxResult { + match self.unixsocket_type { + UnixSocketType::SockStream => self.connect_stream(addr), + UnixSocketType::SockDgram => self.connect_dgram(addr), + UnixSocketType::SockSeqpacket => unimplemented!(), + } + } + + /// For stream socket, connects the socket to a specified address, push info into remote socket + fn connect_stream(&mut self, addr: SocketAddrUnix) -> LinuxResult { let now_state = self.get_state(); if now_state != UnixSocketStatus::Connecting && now_state != UnixSocketStatus::Connected { //a new block is needed to free rwlock @@ -597,14 +701,117 @@ impl UnixSocket { } } + // Dgram socket will not check if remote exists + fn connect_dgram(&mut self, addr: SocketAddrUnix) -> LinuxResult { + let mut table = UNIX_TABLE.write(); + let mut socket_inner = table.get_mut(self.get_sockethandle()).unwrap().lock(); + socket_inner.dgram_connected_addr = Some(addr); + Ok(()) + } + + // check if the source address is null, if so, set to an anonymous address + fn check_and_set_addr(&self) -> SocketAddrUnix { + let mut source_addr = { + let table = UNIX_TABLE.read(); + let addr = table + .get(self.get_sockethandle()) + .unwrap() + .lock() + .get_addr(); + addr + }; + if source_addr.sun_path.iter().all(|&c| c == 0) { + debug!("source addr is null, set to an anonymous address"); + source_addr = generate_anonymous_address(); + let mut binding = UNIX_TABLE.write(); + let socket_inner = binding.get_mut(self.get_sockethandle()).unwrap().lock(); + socket_inner.addr.lock().set_addr(&source_addr); + } + source_addr + } + /// Sends data to a specified address. - pub fn sendto(&self, _buf: &[u8], _addr: SocketAddrUnix) -> LinuxResult { - unimplemented!() + pub fn sendto(&self, buf: &[u8], addr: SocketAddrUnix) -> LinuxResult { + match self.unixsocket_type { + UnixSocketType::SockStream => unimplemented!(), + UnixSocketType::SockDgram => { + let source_addr = self.check_and_set_addr(); + let target_handle = { + let table = UNIX_TABLE.read(); + match table.find(|socket_inner| socket_inner.lock().get_addr() == addr) { + Some((handle, _)) => *handle, + None => return Err(LinuxError::ENOENT), + } + }; + + let target_socket = UNIX_TABLE + .read() + .get(target_handle) + .ok_or(LinuxError::ENOENT)? + .clone(); + let mut target_inner = target_socket.lock(); + + // check if the target socket is a datagram socket + if target_inner.socket_type != UnixSocketType::SockDgram { + return Err(LinuxError::EINVAL); + } + + // check if the target socket is bound to an address + let target_addr = target_inner.get_addr(); + if target_addr.sun_path.iter().all(|&c| c == 0) { + return Err(LinuxError::EINVAL); + } + + // check if the target socket is connected + if target_inner.datagram_queue.len() >= target_inner.buf.capacity() { + return Err(LinuxError::EAGAIN); + } + target_inner + .datagram_queue + .push_back((source_addr, buf.to_vec())); + target_inner + .datagram_queue + .push_back((source_addr, buf.to_vec())); + Ok(buf.len()) + } + UnixSocketType::SockSeqpacket => unimplemented!(), + } } /// Receives data from the socket and returns the sender's address. - pub fn recvfrom(&self, _buf: &mut [u8]) -> LinuxResult<(usize, Option)> { - unimplemented!() + pub fn recvfrom(&self, buf: &mut [u8]) -> LinuxResult<(usize, Option)> { + match self.unixsocket_type { + UnixSocketType::SockStream | UnixSocketType::SockSeqpacket => unimplemented!(), + UnixSocketType::SockDgram => { + loop { + let socket_inner = { + let table = UNIX_TABLE.read(); + table + .get(self.get_sockethandle()) + .ok_or(LinuxError::EBADF)? + .clone() + }; + let mut inner = socket_inner.lock(); + + // check if the buffer is empty, if not, copy data to buf + // if data is larger than buf, the remaining data will be truncated + if let Some((source_addr, data)) = inner.datagram_queue.pop_front() { + let len = buf.len().min(data.len()); + buf[..len].copy_from_slice(&data[..len]); + return Ok((len, Some(source_addr))); + } else { + // the buffer is empty + if self.is_nonblocking() { + return Err(LinuxError::EAGAIN); + } else { + // block until data is available + drop(inner); + yield_now(); + } + } + } + } + } } /// Listens for incoming connections on the socket. @@ -694,7 +901,10 @@ impl UnixSocket { /// Checks if the socket is in a listening state. pub fn is_listening(&self) -> bool { let now_state = self.get_state(); - matches!(now_state, UnixSocketStatus::Listening) + match now_state { + UnixSocketStatus::Listening => true, + _ => false, + } } /// Returns the socket type of the `UnixSocket`.