-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix tokio task leak #151
base: main
Are you sure you want to change the base?
Fix tokio task leak #151
Conversation
Everything seems to be green this time. $ pre-commit --version && pre-commit run --all-files && echo $?
pre-commit 4.1.0
Trim Trailing Whitespace.................................................Passed
Fix End of Files.........................................................Passed
Check for added large files..............................................Passed
rustfmt..................................................................Passed
Integration Test Artifact Checksums......................................Passed
0 |
I have also this other MR for the project, for which I would love to hear your thoughts. |
208d34f
to
ff9eeba
Compare
let receiver = Arc::new(tokio::sync::Mutex::new(receiver)); | ||
for _ in 0..number_of_workers { | ||
let receiver = receiver.clone(); | ||
handles.push(tokio::spawn(async move { | ||
loop { | ||
let mut receiver = receiver.lock().await; | ||
if let Some(task) = receiver.recv().await { | ||
drop(receiver); | ||
if let Err(err) = task.await { | ||
tracing::error!(%err, "error running price update"); | ||
} | ||
} | ||
} | ||
})); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation basically limits the amount of tasks that can be run at the same time. I think a more straightforward way for doing this is to use a semaphore. You can create a semaphore permit before spawning a task and move it inside the task. When the task is finished, the permit will automatically be released, allowing more tasks to be spawned. It's simpler because it doesn't require an additional channel and doesn't require using join handles.
sender | ||
.send(tokio::spawn(async move { | ||
if let Err(err) = | ||
Oracle::handle_price_account_update(&*state, network, &pubkey, &account) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why it's possible to accumulate a lot of long-lived tasks. Maybe we should add a timeout to Oracle::handle_price_account_update
? Especially considering that now if tasks become stuck, handling price updates can halt completely.
} | ||
})) | ||
.await | ||
.context("sending handle_price_account_update task to worker")?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I presume we were using spawn
here to make sure subscriber()
keeps running while we handling the updates. With this change, subscriber()
will potentially wait at this .await
point for previous tasks to complete. This can delay handling new updates. It can also interfere with the pubsub client because we'll not be fetching new updates while we're waiting for old tasks to complete. (It's probably still better than spawning an unlimited amount of tasks.)
While running the agent version
v.2.12.1
, it crashed after some time with an out of memory issue, having to be killed by the OS.The machine I was using has 48GB of RAM
It was happening for both mainet and testnet.
I did not see anything particularly special about the resources it was using:

But the number of tasks seems large.

One can see 7721 tasks while a lot of them are idle for some time.
After observing it, the number of tasks would increase with time until the point it would, the binary would be killed.
The idle tasks are created here https://github.com/pyth-network/pyth-agent/blob/main/src/agent/services/oracle.rs#L132 as can be seen in the image, where the subscriber is handling the
handle_price_account_update
.That line is crating tokio tasks without keeping track of the task handle
JoinHandle
, in cases where those are created faster than they are.await
ed this can lead to a leak.After the proposed change I can see a much more comfortable number of tasks:

The number of tasks now is stable around 100, being 114 in the attached image.
It is worthy mentioning that I used 100 worker tasks to wait for the previously leaked ones to finish, so this number can be way smaller with another configuration.
In order to reproduce the tokio console one can follow the instructions in https://github.com/tokio-rs/console which are basic:
cargo install --locked tokio-console
console-subscriber = "0.3.0"
as a dependencyconsole_subscriber::init();
as the first line in themain
funcitonRUSTFLAGS="--cfg tokio_unstable" cargo run --bin agent -- --config <config file path>
tokio-console
to watch its data