Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tokio task leak #151

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open

Conversation

rodoufu
Copy link
Contributor

@rodoufu rodoufu commented Feb 27, 2025

While running the agent version v.2.12.1, it crashed after some time with an out of memory issue, having to be killed by the OS.
The machine I was using has 48GB of RAM

It was happening for both mainet and testnet.

I did not see anything particularly special about the resources it was using:
tokio_console_resources-pr

But the number of tasks seems large.
tokio_console_threads-pr

One can see 7721 tasks while a lot of them are idle for some time.
After observing it, the number of tasks would increase with time until the point it would, the binary would be killed.
The idle tasks are created here https://github.com/pyth-network/pyth-agent/blob/main/src/agent/services/oracle.rs#L132 as can be seen in the image, where the subscriber is handling the handle_price_account_update.
That line is crating tokio tasks without keeping track of the task handle JoinHandle, in cases where those are created faster than they are .awaited this can lead to a leak.

After the proposed change I can see a much more comfortable number of tasks:
tokio-console_threads_after-pr
The number of tasks now is stable around 100, being 114 in the attached image.
It is worthy mentioning that I used 100 worker tasks to wait for the previously leaked ones to finish, so this number can be way smaller with another configuration.

In order to reproduce the tokio console one can follow the instructions in https://github.com/tokio-rs/console which are basic:

  • Install tokio-console with cargo install --locked tokio-console
  • Add console-subscriber = "0.3.0" as a dependency
  • Add console_subscriber::init(); as the first line in the main funciton
  • Run the binary with RUSTFLAGS="--cfg tokio_unstable" cargo run --bin agent -- --config <config file path>
  • Run tokio-console to watch its data

@rodoufu
Copy link
Contributor Author

rodoufu commented Mar 5, 2025

Everything seems to be green this time.

$ pre-commit --version && pre-commit run --all-files && echo $?
pre-commit 4.1.0

Trim Trailing Whitespace.................................................Passed
Fix End of Files.........................................................Passed
Check for added large files..............................................Passed
rustfmt..................................................................Passed
Integration Test Artifact Checksums......................................Passed

0

@rodoufu
Copy link
Contributor Author

rodoufu commented Mar 5, 2025

I have also this other MR for the project, for which I would love to hear your thoughts.

cc @aditya520 @ali-bahjati

@rodoufu rodoufu force-pushed the fixTokioTaskLeak branch from 208d34f to ff9eeba Compare March 5, 2025 14:10
Comment on lines +100 to +114
let receiver = Arc::new(tokio::sync::Mutex::new(receiver));
for _ in 0..number_of_workers {
let receiver = receiver.clone();
handles.push(tokio::spawn(async move {
loop {
let mut receiver = receiver.lock().await;
if let Some(task) = receiver.recv().await {
drop(receiver);
if let Err(err) = task.await {
tracing::error!(%err, "error running price update");
}
}
}
}));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation basically limits the amount of tasks that can be run at the same time. I think a more straightforward way for doing this is to use a semaphore. You can create a semaphore permit before spawning a task and move it inside the task. When the task is finished, the permit will automatically be released, allowing more tasks to be spawned. It's simpler because it doesn't require an additional channel and doesn't require using join handles.

sender
.send(tokio::spawn(async move {
if let Err(err) =
Oracle::handle_price_account_update(&*state, network, &pubkey, &account)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why it's possible to accumulate a lot of long-lived tasks. Maybe we should add a timeout to Oracle::handle_price_account_update? Especially considering that now if tasks become stuck, handling price updates can halt completely.

}
}))
.await
.context("sending handle_price_account_update task to worker")?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume we were using spawn here to make sure subscriber() keeps running while we handling the updates. With this change, subscriber() will potentially wait at this .await point for previous tasks to complete. This can delay handling new updates. It can also interfere with the pubsub client because we'll not be fetching new updates while we're waiting for old tasks to complete. (It's probably still better than spawning an unlimited amount of tasks.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants