From ad43d65e75cc5385a50ed72ab96248d27d95342b Mon Sep 17 00:00:00 2001 From: Kevin Dolan Date: Wed, 23 Oct 2024 18:08:29 +0100 Subject: [PATCH 1/2] Fixed error in timout-from-first-chunk logic. --- dorado/read_pipeline/BasecallerNode.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dorado/read_pipeline/BasecallerNode.cpp b/dorado/read_pipeline/BasecallerNode.cpp index 36a30abc2..7178f0708 100644 --- a/dorado/read_pipeline/BasecallerNode.cpp +++ b/dorado/read_pipeline/BasecallerNode.cpp @@ -317,7 +317,9 @@ void BasecallerNode::basecall_worker_thread(int worker_id) { m_batched_chunks[worker_id].push_back(std::move(chunk)); - if (m_batched_chunks.size() == 1 || !measure_timeout_from_first_chunk) { + if (m_batched_chunks[worker_id].size() == 1 || !measure_timeout_from_first_chunk) { + // If we're measuring the timeout from the first chunk, we only reset the timer + // if this is the first chunk to be added to the buffer. chunk_reserve_time = std::chrono::system_clock::now(); } } From 56c47b1d4880e3fc152410fe29f1fe48b4641c02 Mon Sep 17 00:00:00 2001 From: Kevin Dolan Date: Thu, 24 Oct 2024 09:48:30 +0100 Subject: [PATCH 2/2] Minor change to make code clearer. --- dorado/read_pipeline/BasecallerNode.cpp | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/dorado/read_pipeline/BasecallerNode.cpp b/dorado/read_pipeline/BasecallerNode.cpp index 7178f0708..354bd6983 100644 --- a/dorado/read_pipeline/BasecallerNode.cpp +++ b/dorado/read_pipeline/BasecallerNode.cpp @@ -256,6 +256,7 @@ void BasecallerNode::basecall_worker_thread(int worker_id) { const size_t chunk_size = m_model_runners[worker_id]->chunk_size(); const bool is_low_latency = m_model_runners[worker_id]->is_low_latency(); const int chunk_queue_idx = worker_id % int(m_chunk_in_queues.size()); + auto &worker_chunks = m_batched_chunks[worker_id]; const int batch_timeout_ms = (is_low_latency && m_low_latency_batch_timeout_ms != 0) ? m_low_latency_batch_timeout_ms @@ -278,7 +279,7 @@ void BasecallerNode::basecall_worker_thread(int worker_id) { if (pop_status == utils::AsyncQueueStatus::Timeout) { // try_pop_until timed out without getting a new chunk. - if (!m_batched_chunks[worker_id].empty()) { + if (!worker_chunks.empty()) { // get scores for whatever chunks are available. basecall_current_batch(worker_id); } @@ -289,7 +290,7 @@ void BasecallerNode::basecall_worker_thread(int worker_id) { // There's chunks to get_scores, so let's add them to our input tensor // FIXME -- it should not be possible to for this condition to be untrue. - if (m_batched_chunks[worker_id].size() != batch_size) { + if (worker_chunks.size() != batch_size) { // Copy the chunk into the input tensor auto &source_read = chunk->owning_read->read; @@ -312,26 +313,26 @@ void BasecallerNode::basecall_worker_thread(int worker_id) { } // Insert the chunk in the input tensor - m_model_runners[worker_id]->accept_chunk( - static_cast(m_batched_chunks[worker_id].size()), input_slice); + m_model_runners[worker_id]->accept_chunk(static_cast(worker_chunks.size()), + input_slice); - m_batched_chunks[worker_id].push_back(std::move(chunk)); + worker_chunks.push_back(std::move(chunk)); - if (m_batched_chunks[worker_id].size() == 1 || !measure_timeout_from_first_chunk) { + if (worker_chunks.size() == 1 || !measure_timeout_from_first_chunk) { // If we're measuring the timeout from the first chunk, we only reset the timer // if this is the first chunk to be added to the buffer. chunk_reserve_time = std::chrono::system_clock::now(); } } - if (m_batched_chunks[worker_id].size() == batch_size) { + if (worker_chunks.size() == batch_size) { // Input tensor is full, let's get_scores. basecall_current_batch(worker_id); chunk_reserve_time = std::chrono::system_clock::now(); } } - if (!m_batched_chunks[worker_id].empty()) { + if (!worker_chunks.empty()) { basecall_current_batch(worker_id); }