Skip to content

Commit

Permalink
Allocate thread for apiserver/scheduler.
Browse files Browse the repository at this point in the history
Signed-off-by: Klaus Ma <klausm@nvidia.com>
  • Loading branch information
k82cn committed Jan 15, 2025
1 parent 4f674ba commit 136d571
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 5 deletions.
21 changes: 16 additions & 5 deletions session_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,30 @@ async fn main() -> Result<(), FlameError> {

log::info!("flame-session-manager is starting ...");

let mut threads = HashMap::new();
let mut handlers = vec![];

let storage = storage::new_ptr(&ctx.storage).await?;

// Load data from engine, e.g. sqlite.
storage.load_data().await?;

threads.insert("scheduler", scheduler::new(storage.clone()));
threads.insert("apiserver", apiserver::new(storage.clone()));
{
let storage = storage.clone();
let ctx = ctx.clone();
let handler = tokio::spawn(async move {
let apiserver = apiserver::new(storage);
apiserver.run(ctx).await
});
handlers.push(handler);
}

for thread in threads.values() {
let handler = thread.run(ctx.clone());
{
let storage = storage.clone();
let ctx = ctx.clone();
let handler = tokio::spawn(async move {
let scheduler = scheduler::new(storage);
scheduler.run(ctx).await
});
handlers.push(handler);
}

Expand Down
17 changes: 17 additions & 0 deletions session_manager/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ impl SnapShot {
exec_index: Arc::new(Mutex::new(HashMap::new())),
}
}

pub fn debug(&self) -> Result<(), FlameError> {
if log::log_enabled!(log::Level::Debug) {
let ssn_num = {
let ssns = lock_ptr!(self.sessions)?;
ssns.len()
};
let exe_num = {
let exes = lock_ptr!(self.executors)?;
exes.len()
};

log::debug!("Session: <{ssn_num}>, Executor: <{exe_num}>");
}

Ok(())
}
}

#[derive(Debug, Default, Clone)]
Expand Down
2 changes: 2 additions & 0 deletions session_manager/src/scheduler/actions/allocate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ impl Action for AllocateAction {
trace_fn!("AllocateAction::execute");
let ss = ctx.snapshot.clone();

ss.debug()?;

let mut open_ssns = BinaryHeap::new(ssn_order_fn(ctx));
let mut idle_execs = Vec::new();

Expand Down
2 changes: 2 additions & 0 deletions session_manager/src/scheduler/actions/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ impl Action for BackfillAction {
trace_fn!("BackfillAction::execute");
let ss = ctx.snapshot.clone();

ss.debug()?;

let mut open_ssns = BinaryHeap::new(ssn_order_fn(ctx));
let mut idle_execs = Vec::new();

Expand Down

0 comments on commit 136d571

Please sign in to comment.