Skip to content

Commit

Permalink
initial refactoring using only task handles
Browse files Browse the repository at this point in the history
  • Loading branch information
rsachdeva committed Dec 24, 2024
1 parent 46e73e8 commit bb6c5d7
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 49 deletions.
9 changes: 2 additions & 7 deletions chatty-tcp/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use chatty_tcp::listen::room::serve;
use chatty_tcp::listen::state::RoomState;
use chatty_types::config::{setup_tracing, Component::Server};
use chatty_types::response::ChatResponse;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
Expand All @@ -27,17 +27,12 @@ pub async fn main() -> Result<()> {
span.in_scope(|| info!("listening on {}", listening_on));

// Set up room state for use
let user_set = Mutex::new(HashSet::new());
// bounded channel
let (tx, _rx) = broadcast::channel::<ChatResponse>(100);
// task handles
let task_handles = Mutex::new(HashMap::new());

let room_state = Arc::new(RoomState {
user_set,
tx,
task_handles,
});
let room_state = Arc::new(RoomState { tx, task_handles });

let mut connection_handles = Vec::new();

Expand Down
48 changes: 26 additions & 22 deletions chatty-tcp/src/listen/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,10 @@ pub async fn process_command(
let command: ChatCommand = serde_json::from_str(&line)?;
match command {
ChatCommand::Join(username) => {
let mut users = room_state.user_set.lock().await;
let user_already_exist = users.contains(&username);
let user_already_exist =
room_state.task_handles.lock().await.contains_key(&username);

let chat_response = if !user_already_exist {
users.insert(username.clone());
info!("Users in room after addition: {:?}", users);
info!("Client {} joined as {}", addr, username);
let rx = room_state.tx.subscribe();
let send_task_handle = tokio::spawn(send_from_broadcast_channel_task(
writer.clone(),
Expand All @@ -58,6 +55,11 @@ pub async fn process_command(
.lock()
.await
.insert(username.clone(), send_task_handle);
info!(
"Users in room after addition: {:?}",
room_state.task_handles.lock().await.keys()
);
info!("Client {} joined as {}", addr, username);
send_to_broadcast_channel(
ChatResponse::Broadcast(ChatMemo {
username: username.clone(),
Expand Down Expand Up @@ -96,10 +98,7 @@ pub async fn process_command(
ChatCommand::Leave(username) => {
remove_username(username.clone(), room_state.clone()).await;
debug!("User {} has left", username);
if let Some(handle) = room_state.task_handles.lock().await.remove(&username) {
info!("Aborting background task for user: {}", username);
handle.abort();
}

debug!("User {} has left so sending broadcast message", username);
send_to_broadcast_channel(
ChatResponse::Broadcast(ChatMemo {
Expand All @@ -117,40 +116,45 @@ pub async fn process_command(
}

pub async fn remove_username(username: String, room_state: Arc<RoomState>) {
let mut users = room_state.user_set.lock().await;
users.remove(&username);
let mut lookup = room_state.task_handles.lock().await;
if let Some(handle) = lookup.remove(&username) {
info!("Aborting background task for user: {}", username);
handle.abort();
}
info!("User {} removed from room", username);
// list connected users
let users: Vec<String> = users.iter().cloned().collect();
let users: Vec<String> = lookup.keys().cloned().collect();
info!("Users in room after removal: {:?}", users);
}

#[cfg(test)]
mod tests {
use super::*;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;

#[tokio::test]
async fn test_remove_username() {
let mut user_set = HashSet::new();
user_set.insert("test_user".to_string());
user_set.insert("other_user".to_string());
let mut lookup_initial = HashMap::new();
let dummy_task: JoinHandle<Result<(), RoomError>> = tokio::spawn(async { Ok(()) });
lookup_initial.insert("test_user".to_string(), dummy_task);
let dummy_task2: JoinHandle<Result<(), RoomError>> = tokio::spawn(async { Ok(()) });
lookup_initial.insert("other_user".to_string(), dummy_task2);

let (tx, _) = broadcast::channel(100);
let room_state = Arc::new(RoomState {
user_set: Mutex::new(user_set),
tx,
task_handles: Mutex::new(HashMap::new()),
task_handles: Mutex::new(lookup_initial),
});

// Execute removal
remove_username("test_user".to_string(), room_state.clone()).await;

// Verify user was removed
let users = room_state.user_set.lock().await;
assert!(!users.contains("test_user"));
assert!(users.contains("other_user"));
assert_eq!(users.len(), 1);
let lookup = room_state.task_handles.lock().await;
assert!(!lookup.contains_key("test_user"));
assert!(lookup.contains_key("other_user"));
assert_eq!(lookup.len(), 1);
}
}
3 changes: 1 addition & 2 deletions chatty-tcp/src/listen/state.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crate::listen::command::RoomError;
use chatty_types::response::ChatResponse;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use tokio::sync::broadcast;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

type TaskHandleMap = Mutex<HashMap<String, JoinHandle<Result<(), RoomError>>>>;

pub struct RoomState {
pub user_set: Mutex<HashSet<String>>,
pub tx: broadcast::Sender<ChatResponse>,
pub task_handles: TaskHandleMap,
}
26 changes: 8 additions & 18 deletions chatty-tcp/tests/client_server_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chatty_tcp::listen::state::RoomState;
use chatty_types::config::setup_tracing;
use chatty_types::config::Component::Server;
use chatty_types::response::ChatResponse;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
Expand All @@ -25,15 +25,10 @@ fn init_tracing_for_tests() {
async fn single_client() {
init_tracing_for_tests();
// Set up room state
let user_set = Mutex::new(HashSet::new());
let (tx, _rx) = broadcast::channel::<ChatResponse>(100);
let task_handles = Mutex::new(HashMap::new());

let room_state = Arc::new(RoomState {
user_set,
tx,
task_handles,
});
let room_state = Arc::new(RoomState { tx, task_handles });

// Start the server in a background task
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
Expand Down Expand Up @@ -83,8 +78,8 @@ async fn single_client() {

// // Verify user is there
let room_state_for_removal = room_state.clone();
let users = room_state_for_removal.user_set.lock().await;
assert!(users.contains("alone"));
let lookup = room_state_for_removal.task_handles.lock().await;
assert!(lookup.contains_key("alone"));

// leave command
let leave_command = r#"{"Leave":"alone"}"#;
Expand All @@ -104,12 +99,7 @@ async fn multiple_clients() {
let task_handles = Mutex::new(HashMap::new());

// Set up room state
let user_set = Mutex::new(HashSet::new());
let room_state = Arc::new(RoomState {
user_set,
tx,
task_handles,
});
let room_state = Arc::new(RoomState { tx, task_handles });
let state = room_state.clone();

// Start the server in a background task
Expand Down Expand Up @@ -194,9 +184,9 @@ async fn multiple_clients() {
let expected_message2 = r#"{"Broadcast":{"username":"carl","content":"Left"}}"#;
assert_eq!(broadcast_message, expected_message2);

let user_set = state.user_set.lock().await;
assert_eq!(user_set.len(), 1);
assert!(user_set.contains("david"));
let lookup = state.task_handles.lock().await;
assert_eq!(lookup.len(), 1);
assert!(lookup.contains_key("david"));

// Clean up
server_handle.abort();
Expand Down

0 comments on commit bb6c5d7

Please sign in to comment.