Skip to content

Commit

Permalink
Merge pull request #159 from lhw2002426/multiprocess-loopback
Browse files Browse the repository at this point in the history
add poll stete POLLHUP
  • Loading branch information
ken4647 authored Dec 21, 2024
2 parents a67be9b + 6936553 commit 42261fd
Show file tree
Hide file tree
Showing 17 changed files with 69 additions and 2 deletions.
6 changes: 6 additions & 0 deletions api/ruxos_posix_api/src/imp/io_mpx/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ impl EpollInstance {
events[events_num].data = ev.data;
events_num += 1;
}

if state.pollhup {
events[events_num].events = ctypes::EPOLLHUP;
events[events_num].data = ev.data;
events_num += 1;
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions api/ruxos_posix_api/src/imp/io_mpx/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ fn poll_all(fds: &mut [ctypes::pollfd]) -> LinuxResult<usize> {
*revents |= ctypes::EPOLLOUT as i16;
events_num += 1;
}

if state.pollhup {
*revents |= ctypes::EPOLLHUP as i16;
events_num += 1;
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions api/ruxos_posix_api/src/imp/io_mpx/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ impl FdSets {
unsafe { set_fd_set(res_write_fds, fd) };
res_num += 1;
}
if state.pollhup {
unsafe { set_fd_set(res_except_fds, fd) };
res_num += 1;
}
}
Err(e) => {
debug!(" except: {} {:?}", fd, e);
Expand Down
4 changes: 3 additions & 1 deletion api/ruxos_posix_api/src/imp/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ enum RingBufferStatus {
Normal,
}

const RING_BUFFER_SIZE: usize = 256;
const RING_BUFFER_SIZE: usize = ruxconfig::PIPE_BUFFER_SIZE;

pub struct PipeRingBuffer {
arr: [u8; RING_BUFFER_SIZE],
Expand Down Expand Up @@ -210,10 +210,12 @@ impl FileLike for Pipe {
}

fn poll(&self) -> LinuxResult<PollState> {
let write_end_count = Arc::weak_count(&self.buffer);
let buf = self.buffer.lock();
Ok(PollState {
readable: self.readable() && buf.available_read() > 0,
writable: self.writable() && buf.available_write() > 0,
pollhup: self.write_end_close(),
})
}

Expand Down
2 changes: 2 additions & 0 deletions api/ruxos_posix_api/src/imp/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ impl ruxfdtable::FileLike for Stdin {
Ok(PollState {
readable: true,
writable: true,
pollhup: false,
})
}

Expand Down Expand Up @@ -204,6 +205,7 @@ impl ruxfdtable::FileLike for Stdout {
Ok(PollState {
readable: true,
writable: true,
pollhup: false,
})
}

Expand Down
2 changes: 2 additions & 0 deletions crates/axio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,6 @@ pub struct PollState {
pub readable: bool,
/// Object can be writen now.
pub writable: bool,
/// Object is closed (by remote) now.
pub pollhup: bool,
}
3 changes: 3 additions & 0 deletions modules/ruxconfig/defconfig.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@ smp = "1"

# Maximum number of keys per thread.
pthread-key-max = "1024"

# Pipe channel bufer size.
pipe-buffer-size = "0x10000"
4 changes: 3 additions & 1 deletion modules/ruxnet/src/lwip_impl/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use lwip_rust::bindings::{
err_enum_t_ERR_MEM, err_enum_t_ERR_OK, err_enum_t_ERR_USE, err_enum_t_ERR_VAL, err_t,
ip_addr_t, pbuf, pbuf_free, tcp_accept, tcp_arg, tcp_bind, tcp_close, tcp_connect,
tcp_listen_with_backlog, tcp_new, tcp_output, tcp_pcb, tcp_recv, tcp_recved, tcp_state_CLOSED,
tcp_state_LISTEN, tcp_write, TCP_DEFAULT_LISTEN_BACKLOG, TCP_MSS,
tcp_state_CLOSE_WAIT, tcp_state_LISTEN, tcp_write, TCP_DEFAULT_LISTEN_BACKLOG, TCP_MSS,
};
use ruxtask::yield_now;

Expand Down Expand Up @@ -475,13 +475,15 @@ impl TcpSocket {
Ok(PollState {
readable: self.inner.accept_queue.lock().len() != 0,
writable: false,
pollhup: false,
})
} else {
let test = self.inner.recv_queue.lock().len();
// stream
Ok(PollState {
readable: self.inner.recv_queue.lock().len() != 0,
writable: true,
pollhup: unsafe { (*self.pcb.get()).state } == tcp_state_CLOSE_WAIT,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions modules/ruxnet/src/lwip_impl/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ impl UdpSocket {
Ok(PollState {
readable: self.inner.recv_queue.lock().len() != 0,
writable: true,
pollhup: false,
})
}
}
Expand Down
7 changes: 7 additions & 0 deletions modules/ruxnet/src/smoltcp_impl/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ impl TcpSocket {
_ => Ok(PollState {
readable: false,
writable: false,
pollhup: false,
}),
}
}
Expand Down Expand Up @@ -482,16 +483,21 @@ impl TcpSocket {
Ok(PollState {
readable: false,
writable,
pollhup: false,
})
}

fn poll_stream(&self) -> AxResult<PollState> {
// SAFETY: `self.handle` should be initialized in a connected socket.
let handle = unsafe { self.handle.get().read().unwrap() };
let pollhup = SOCKET_SET.with_socket_mut::<tcp::Socket, _, _>(handle, |socket| {
socket.state() == tcp::State::CloseWait
});
SOCKET_SET.with_socket::<tcp::Socket, _, _>(handle, |socket| {
Ok(PollState {
readable: !socket.may_recv() || socket.can_recv(),
writable: !socket.may_send() || socket.can_send(),
pollhup,
})
})
}
Expand All @@ -502,6 +508,7 @@ impl TcpSocket {
Ok(PollState {
readable: LISTEN_TABLE.can_accept(local_addr.port)?,
writable: false,
pollhup: false,
})
}

Expand Down
2 changes: 2 additions & 0 deletions modules/ruxnet/src/smoltcp_impl/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,14 @@ impl UdpSocket {
return Ok(PollState {
readable: false,
writable: false,
pollhup: false,
});
}
SOCKET_SET.with_socket_mut::<udp::Socket, _, _>(self.handle, |socket| {
Ok(PollState {
readable: socket.can_recv(),
writable: socket.can_send(),
pollhup: false,
})
})
}
Expand Down
17 changes: 17 additions & 0 deletions modules/ruxnet/src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ impl UnixSocket {
Ok(PollState {
readable: false,
writable,
pollhup: false,
})
}

Expand All @@ -480,11 +481,25 @@ impl UnixSocket {
match now_state {
UnixSocketStatus::Connecting => self.poll_connect(),
UnixSocketStatus::Connected => {
let remote_is_close = {
let remote_handle = self.get_peerhandle();
match remote_handle {
Some(handle) => {
let mut binding = UNIX_TABLE.write();
let mut remote_status = binding.get_mut(handle).unwrap().lock().get_state();
remote_status == UnixSocketStatus::Closed
}
None => {
return Err(LinuxError::ENOTCONN);
}
}
};
let mut binding = UNIX_TABLE.write();
let mut socket_inner = binding.get_mut(self.get_sockethandle()).unwrap().lock();
Ok(PollState {
readable: !socket_inner.may_recv() || socket_inner.can_recv(),
writable: !socket_inner.may_send() || socket_inner.can_send(),
pollhup: remote_is_close,
})
}
UnixSocketStatus::Listening => {
Expand All @@ -493,11 +508,13 @@ impl UnixSocket {
Ok(PollState {
readable: socket_inner.can_accept(),
writable: false,
pollhup: false,
})
}
_ => Ok(PollState {
readable: false,
writable: false,
pollhup: false,
}),
}
}
Expand Down
2 changes: 2 additions & 0 deletions modules/ruxtask/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl FileLike for File {
Ok(PollState {
readable: true,
writable: true,
pollhup: false,
})
}

Expand Down Expand Up @@ -197,6 +198,7 @@ impl FileLike for Directory {
Ok(PollState {
readable: true,
writable: true,
pollhup: false,
})
}

Expand Down
3 changes: 3 additions & 0 deletions platforms/aarch64-qemu-virt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,6 @@ gicd-paddr = "0x0800_0000"

# PSCI
psci-method = "hvc"

# Pipe channel bufer size.
pipe-buffer-size = "0x10000"
3 changes: 3 additions & 0 deletions platforms/aarch64-raspi4.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ uart-irq = "0x79"
# GIC Address
gicc-paddr = "0xFF84_2000"
gicd-paddr = "0xFF84_1000"

# Pipe channel bufer size.
pipe-buffer-size = "0x10000"
3 changes: 3 additions & 0 deletions platforms/riscv64-qemu-virt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,6 @@ pci-ranges = [

# Timer interrupt frequency in Hz.
timer-frequency = "10_000_000" # 10MHz

# Pipe channel bufer size.
pipe-buffer-size = "0x10000"
3 changes: 3 additions & 0 deletions platforms/x86_64-qemu-q35.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,6 @@ pci-ranges = []

# Timer interrupt frequencyin Hz.
timer-frequency = "4_000_000_000" # 4.0GHz

# Pipe channel bufer size.
pipe-buffer-size = "0x10000"

0 comments on commit 42261fd

Please sign in to comment.