From 2d5e2dc784940bfacbfac1d1d84672aa87ca3eea Mon Sep 17 00:00:00 2001 From: William Malpica Date: Fri, 16 Oct 2020 08:57:36 -0500 Subject: [PATCH 1/2] clearing array caches after PartwiseJoin is done --- .../logic_controllers/BatchJoinProcessing.h | 4 ++++ .../execution_graph/logic_controllers/CacheMachine.cpp | 5 +---- .../src/execution_graph/logic_controllers/CacheMachine.h | 8 ++++++++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/engine/src/execution_graph/logic_controllers/BatchJoinProcessing.h b/engine/src/execution_graph/logic_controllers/BatchJoinProcessing.h index d97444be2..f58c48a01 100644 --- a/engine/src/execution_graph/logic_controllers/BatchJoinProcessing.h +++ b/engine/src/execution_graph/logic_controllers/BatchJoinProcessing.h @@ -412,6 +412,10 @@ class PartwiseJoin : public kernel { "duration"_a=timer.elapsed_time(), "kernel_id"_a=this->get_id()); + // these are intra kernel caches. We want to make sure they are empty before we finish. + this->leftArrayCache->clear(); + this->rightArrayCache->clear(); + return kstatus::proceed; } diff --git a/engine/src/execution_graph/logic_controllers/CacheMachine.cpp b/engine/src/execution_graph/logic_controllers/CacheMachine.cpp index a28ead7f1..f3069a6bf 100644 --- a/engine/src/execution_graph/logic_controllers/CacheMachine.cpp +++ b/engine/src/execution_graph/logic_controllers/CacheMachine.cpp @@ -200,10 +200,7 @@ void CacheMachine::put(size_t message_id, std::unique_ptr message_data; - while(message_data = waitingCache->pop_or_wait()) { - printf("...cleaning cache\n"); - } + auto messages = this->waitingCache->get_all(); this->waitingCache->finish(); } diff --git a/engine/src/execution_graph/logic_controllers/CacheMachine.h b/engine/src/execution_graph/logic_controllers/CacheMachine.h index 05768b80f..397626e3f 100644 --- a/engine/src/execution_graph/logic_controllers/CacheMachine.h +++ b/engine/src/execution_graph/logic_controllers/CacheMachine.h @@ -766,6 +766,14 @@ class WaitingQueue { return std::move(data); } + /** + * gets all the messages + */ + std::vector get_all(){ + std::unique_lock lock(mutex_); + return get_all_unsafe(); + } + /** * Waits until all messages are ready then returns all of them. * You should never call this function more than once on a WaitingQueue else From e53274515e11527ee66a3753ed3004a10df9146e Mon Sep 17 00:00:00 2001 From: William Malpica Date: Fri, 16 Oct 2020 08:59:19 -0500 Subject: [PATCH 2/2] updated CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b9e64fffd..999abd60d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ - #1073 Fixed parseSchemaPython can throw exceptions - #1074: Remove lock inside grow() method from PinnedBufferProvider - #1071 Fix crash when loading an empty folder +- #1085 Fixed intra-query memory leak in joins. Fixed by clearing array caches after PartwiseJoin is done # BlazingSQL 0.15.0 (August 31, 2020)