From aa09b57bcc1cee717f41a0a51c674badd8a11ed7 Mon Sep 17 00:00:00 2001 From: wiryls <7984500+wiryls@users.noreply.github.com> Date: Tue, 26 Mar 2024 16:02:43 +0800 Subject: [PATCH 1/4] refactor: rewrite AsyncResourcePool --- .../core/allocator/DIPUAsyncResourcePool.h | 70 ------------------- .../core/allocator/DIPUBFCachingAllocator.cpp | 38 +++++----- .../core/allocator/DIPUBSCachingAllocator.cpp | 29 ++++---- .../core/allocator/DIPUCachingAllocator.h | 17 +++-- .../allocator/DIPURawCachingAllocator.cpp | 34 ++++----- .../runtime/core/allocator/DIPUSpinMutex.h | 38 ---------- .../core/allocator/async_resource_pool.h | 60 ++++++++++++++++ 7 files changed, 114 insertions(+), 172 deletions(-) delete mode 100644 dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUAsyncResourcePool.h delete mode 100644 dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUSpinMutex.h create mode 100644 dipu/torch_dipu/csrc_dipu/runtime/core/allocator/async_resource_pool.h diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUAsyncResourcePool.h b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUAsyncResourcePool.h deleted file mode 100644 index 4533ac3fa..000000000 --- a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUAsyncResourcePool.h +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright (c) 2023, DeepLink. -#pragma once - -#include -#include -#include - -#include "csrc_dipu/runtime/core/DIPUEvent.h" - -namespace dipu { - -constexpr size_t kMaxAsyncResourcePoolLength = 3; - -template -class AsyncResourcePool { - public: - virtual void add(const T& t, std::deque& events) = 0; - virtual T get() = 0; - virtual bool ready() const = 0; - virtual bool empty() const = 0; - virtual size_t size() const = 0; -}; - -template -class AsyncResourcePoolImpl : public AsyncResourcePool { - using Res = std::tuple>; - std::deque list_; - using mutex_t = std::mutex; - mutable mutex_t mutex_; - - public: - void add(const T& t, std::deque& events) override { - std::lock_guard lk(mutex_); - list_.emplace_back(t, std::move(events)); - } - - T get() override { - std::lock_guard lk(mutex_); - T t = std::get<0>(list_.front()); - list_.pop_front(); - return t; - } - - bool empty() const override { - std::lock_guard lk(mutex_); - return list_.empty(); - } - - bool ready() const override { - std::lock_guard lk(mutex_); - if (list_.empty()) { - return false; - } - - for (auto& item : std::get<1>(list_.front())) { - if (!item.query()) { - return false; - } - } - - return true; - } - - size_t size() const override { - std::lock_guard lk(mutex_); - return list_.size(); - } -}; - -} // namespace dipu diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBFCachingAllocator.cpp b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBFCachingAllocator.cpp index cd7e191f1..d77c26788 100644 --- a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBFCachingAllocator.cpp +++ b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBFCachingAllocator.cpp @@ -8,7 +8,6 @@ #include #include "DIPUCachingAllocator.h" -#include "DIPUSpinMutex.h" namespace dipu { @@ -131,7 +130,7 @@ class BFCachingAllocatorImpl { using StreamSetHandle = std::unique_ptr; std::vector streamSets_; - using mutex_t = SpinMutex; + using mutex_t = std::mutex; mutable mutex_t mut_; static size_t roundBytes(size_t nbytes) { @@ -409,35 +408,37 @@ class BFCachingAllocator : public CacheAllocator { private: void restore() const { std::lock_guard lk(resource_pool_mutex_); - while (async_mem_pool()->ready()) { - const auto block = async_mem_pool()->get(); - void* ptr = std::get<0>(block); - int id = static_cast(std::get<1>(block)); + + auto& pool = *async_mem_pool(); + for (auto item = pool.pop(); item; item = pool.pop()) { + auto [ptr, id] = item.value(); DIPU_DEBUG_ALLOCATOR( 8, "BFCachingAllocator: " << __FUNCTION__ << " ,ptr:" << ptr << " ,id:" << id << " ,allocator:" << this << ", device:" << device() << ", async_pool.size:" << async_mem_pool()->size()); - impl->releaseRaw(ptr, id); + impl->releaseRaw(ptr, static_cast(id)); } + set_memory_reserved(impl->memory_reserved()); } void empty_resource_pool() const { std::lock_guard lk(resource_pool_mutex_); - while (!async_mem_pool()->empty()) { - if (!async_mem_pool()->ready()) { + auto& pool = *async_mem_pool(); + while (not pool.empty()) { + auto item = pool.pop(); + if (not item) { std::this_thread::yield(); continue; } - const auto block = async_mem_pool()->get(); - void* ptr = std::get<0>(block); - int id = static_cast(std::get<1>(block)); + + auto [ptr, id] = item.value(); DIPU_DEBUG_ALLOCATOR( 8, "BFCachingAllocator: " << __FUNCTION__ << " ,ptr:" << ptr << " ,id:" << id << " ,allocator:" << this << ", device:" << device()); - impl->releaseRaw(ptr, id); + impl->releaseRaw(ptr, static_cast(id)); } } @@ -481,15 +482,8 @@ class BFCachingAllocator : public CacheAllocator { << ", device:" << allocator_->device()); if (allocator_->impl) { if (ptr()) { - std::deque events; - for (auto const& stream : streams()) { - events.emplace_back(); - DIPU_DEBUG_ALLOCATOR(8, "BFCachingAllocator: record to stream:" - << stream.rawstream()); - events.back().record(stream); - } - allocator_->async_mem_pool()->add(std::make_tuple(ptr(), id_), - events); + allocator_->async_mem_pool()->put(std::make_tuple(ptr(), id_), + streams_to_events()); allocator_->set_memory_allocated(allocator_->memory_allocated() - nbytes_); } diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBSCachingAllocator.cpp b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBSCachingAllocator.cpp index fd860d32d..02ab3bf28 100644 --- a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBSCachingAllocator.cpp +++ b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBSCachingAllocator.cpp @@ -140,10 +140,9 @@ class BSCachingAllocator : public CacheAllocator { void empty_resource_pool() const { DIPU_DEBUG_ALLOCATOR( 8, "BSCachingAllocator::empty_resource_pool ,allocator:" << this); - while (!async_mem_pool()->empty()) { - if (async_mem_pool()->ready()) { - flush_mem_pool(); - } else { + + while (not async_mem_pool()->empty()) { + if (not flush_mem_pool()) { std::this_thread::yield(); } } @@ -179,13 +178,17 @@ class BSCachingAllocator : public CacheAllocator { void release_all_memory() const override { release_all_memory_impl(); } - void flush_mem_pool() const { + bool flush_mem_pool() const { DIPU_DEBUG_ALLOCATOR( 8, "BSCachingAllocator::flush_mem_pool allocator:" << this); - while (async_mem_pool()->ready()) { - auto mem = async_mem_pool()->get(); - restore(std::get<1>(mem), std::get<0>(mem)); + + auto& pool = *async_mem_pool(); + auto done = false; + for (auto item = pool.pop(); item; item = pool.pop(), done = true) { + auto [ptr, size] = item.value(); + restore(size, ptr); } + return done; } struct Context : public DataPtrContextBase { @@ -199,14 +202,8 @@ class BSCachingAllocator : public CacheAllocator { << ", ptr:" << ptr() << ", size_:" << size()); if (allocator_->impl) { - std::deque events; - for (const auto& item : streams()) { - events.emplace_back(); - events.back().record(item); - } - - allocator_->async_mem_pool()->add(std::make_tuple(ptr(), size()), - events); + allocator_->async_mem_pool()->put(std::make_tuple(ptr(), size()), + streams_to_events()); allocator_->set_memory_allocated(allocator_->memory_allocated() - real_size_); allocator_->flush_mem_pool(); diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUCachingAllocator.h b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUCachingAllocator.h index 87d608375..3cdb5bfc0 100644 --- a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUCachingAllocator.h +++ b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUCachingAllocator.h @@ -5,9 +5,9 @@ #include #include -#include "DIPUAsyncResourcePool.h" #include "DIPUCachingAllocatorUtils.h" #include "DIPURawAllocator.h" +#include "async_resource_pool.h" namespace dipu { @@ -42,7 +42,7 @@ const MemoryAlignmentStrategy* getMemoryAlignmentStrategy(); void setMemoryAlignmentStrategy( const MemoryAlignmentStrategy* memoryAlignStrategy); -using AsyncMemPool = AsyncResourcePool>; +using AsyncMemPool = AsyncResourceQueue>; class MemStats { private: @@ -148,6 +148,15 @@ class DIPU_API CacheAllocator : public c10::Allocator, public MemStats { ska::flat_hash_set& streams() { return streams_; } + std::vector streams_to_events() const { + auto index = std::size_t{}; + auto events = std::vector(streams_.size()); + for (auto& stream : streams_) { + events[index++].record(stream); + } + return events; + } + const CacheAllocator* allocator() { return allocator_; } void* ptr() { return ptr_; } @@ -233,9 +242,7 @@ c10::Allocator* get_allocator(int device_id, c10::Allocator* raw_allocator) { namespace name##device_type { \ static allocator_details::RawAllocator::type \ raw_allocator; \ - using AsyncMemPool = \ - AsyncResourcePoolImpl, \ - at::DeviceType::device_type, priority>; \ + using AsyncMemPool = AsyncResourceQueue>; \ static const std::function allocator_get_fn = \ std::bind( \ allocator_details::get_allocator, \ diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPURawCachingAllocator.cpp b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPURawCachingAllocator.cpp index f4ea9422a..4a3e59cfa 100644 --- a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPURawCachingAllocator.cpp +++ b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPURawCachingAllocator.cpp @@ -17,17 +17,13 @@ class RawCachingAllocator : public CacheAllocator { Context(const CacheAllocator* allocator, void* ptr, size_t size, size_t real_size) : DataPtrContextBase(allocator, ptr, size), real_size_(real_size) {} + ~Context() { - std::deque events; - for (const auto& item : streams()) { - events.emplace_back(); - events.back().record(item); - } - auto allocator_ = static_cast(allocator()); - allocator_->async_mem_pool()->add(std::make_tuple(ptr(), size()), events); - allocator_->set_memory_allocated(allocator_->memory_allocated() - - real_size_); - allocator_->empty_cache(); + auto alloc = static_cast(allocator()); + alloc->async_mem_pool()->put(std::make_tuple(ptr(), size()), + streams_to_events()); + alloc->set_memory_allocated(alloc->memory_allocated() - real_size_); + alloc->empty_cache(); } size_t real_size_ = 0; }; @@ -51,17 +47,13 @@ class RawCachingAllocator : public CacheAllocator { void empty_cache() const override { DIPU_DEBUG_ALLOCATOR(8, "RawCachingAllocator: empty_cache"); - while (!async_mem_pool()->empty()) { - if (async_mem_pool()->ready()) { - auto mem = async_mem_pool()->get(); - void* ptr = std::get<0>(mem); - size_t size = std::get<1>(mem); - size_t nbytes = getAllocateSize(size); - raw_allocator()->raw_deallocate(ptr); - set_memory_reserved(memory_reserved() - nbytes); - } else { - std::this_thread::yield(); - } + + auto& pool = *async_mem_pool(); + for (auto item = pool.pop(); item; item = pool.pop()) { + auto [ptr, size] = item.value(); + auto nbytes = getAllocateSize(size); + raw_allocator()->raw_deallocate(ptr); + set_memory_reserved(memory_reserved() - nbytes); } } diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUSpinMutex.h b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUSpinMutex.h deleted file mode 100644 index 4ced52c9f..000000000 --- a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUSpinMutex.h +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright (c) 2023, DeepLink. -#pragma once - -#include -#include - -namespace dipu { - -// FIXME: someone should remove it someday. -/// Simple spin-lock to help build thread-safe functions. -class SpinMutex { - private: - std::atomic excl_{false}; - - public: - constexpr SpinMutex() noexcept = default; - - SpinMutex(const SpinMutex&) = delete; - - void static delay() noexcept { std::this_thread::yield(); } - - void lock() { - for (bool exp = false; - !excl_.compare_exchange_weak(exp, true, std::memory_order_acq_rel); - exp = false) { - delay(); - } - } - - bool try_lock() { - bool exp = false; - return excl_.compare_exchange_weak(exp, true, std::memory_order_acq_rel); - } - - void unlock() { excl_.store(false, std::memory_order_release); } -}; - -} // namespace dipu diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/async_resource_pool.h b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/async_resource_pool.h new file mode 100644 index 000000000..2fe450981 --- /dev/null +++ b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/async_resource_pool.h @@ -0,0 +1,60 @@ +// Copyright (c) 2023, DeepLink. +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "csrc_dipu/runtime/core/DIPUEvent.h" + +namespace dipu { + +constexpr size_t kMaxAsyncResourcePoolLength = 3; + +template +class AsyncResourceQueue { + using value_type = std::pair>; + + std::deque queue; + mutable std::mutex mutex; + std::atomic_size_t count{}; + + public: + void put(T&& item, std::vector&& events) { + std::scoped_lock _(mutex); + queue.emplace_back(std::move(item), std::move(events)); + count.store(queue.size(), std::memory_order_release); + } + + std::optional pop() { + if (empty()) { + return {}; + } + + std::scoped_lock _(mutex); + if (queue.empty()) { + return {}; + } + + auto& front = queue.front(); + for (auto& event : front.second) { + if (!event.query()) { + return {}; + } + } + + auto item = std::move(front.first); + queue.pop_front(); + count.store(queue.size(), std::memory_order_release); + return item; + } + + bool empty() const { return count.load(std::memory_order_acquire) != 0; } + std::size_t size() const { return count.load(std::memory_order_acquire); } +}; + +} // namespace dipu From 364ec25129203fd0d1883a6ff3e9ff330e3e5be2 Mon Sep 17 00:00:00 2001 From: wiryls <7984500+wiryls@users.noreply.github.com> Date: Wed, 27 Mar 2024 11:18:52 +0800 Subject: [PATCH 2/4] fix: add back template args and rewrite DIPU event --- .../torch_dipu/csrc_dipu/binding/ExportRT.cpp | 4 +- .../csrc_dipu/runtime/core/DIPUEvent.h | 128 ++++++++---------- .../core/allocator/DIPUBFCachingAllocator.cpp | 4 +- .../core/allocator/DIPUBSCachingAllocator.cpp | 2 +- .../core/allocator/DIPUCachingAllocator.h | 49 +++++-- .../allocator/DIPURawCachingAllocator.cpp | 2 +- .../core/allocator/async_resource_pool.h | 60 -------- .../core/allocator/async_resource_queue.h | 60 ++++++++ 8 files changed, 162 insertions(+), 147 deletions(-) delete mode 100644 dipu/torch_dipu/csrc_dipu/runtime/core/allocator/async_resource_pool.h create mode 100644 dipu/torch_dipu/csrc_dipu/runtime/core/allocator/async_resource_queue.h diff --git a/dipu/torch_dipu/csrc_dipu/binding/ExportRT.cpp b/dipu/torch_dipu/csrc_dipu/binding/ExportRT.cpp index c2a5e9d27..0144a8943 100644 --- a/dipu/torch_dipu/csrc_dipu/binding/ExportRT.cpp +++ b/dipu/torch_dipu/csrc_dipu/binding/ExportRT.cpp @@ -186,8 +186,8 @@ static void exportEvent(py::module& m) { }), py::arg("enable_timing") = false, py::arg("blocking") = false, py::arg("interprocess") = false) - .def("record", static_cast(&DIPUEvent::record), - "record event") + .def( + "record", [](DIPUEvent& self) { self.record(); }, "record event") .def("record", pybind11::overload_cast(&DIPUEvent::record), "record event on stream") diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/DIPUEvent.h b/dipu/torch_dipu/csrc_dipu/runtime/core/DIPUEvent.h index c2270cbfa..341d98079 100644 --- a/dipu/torch_dipu/csrc_dipu/runtime/core/DIPUEvent.h +++ b/dipu/torch_dipu/csrc_dipu/runtime/core/DIPUEvent.h @@ -1,6 +1,7 @@ // Copyright (c) 2023, DeepLink. #pragma once +#include #include #include @@ -13,113 +14,104 @@ namespace dipu { /* * DIPUEvents are movable not copyable wrappers around DIPU's events. * DIPUEvents are constructed lazily when first recorded. + * + * DIPU does not support IpcEventHandle currently. */ class DIPU_API DIPUEvent { + c10::DeviceIndex index{0}; + deviceEvent_t event{nullptr}; + public: - // Constructors - // Default value for `flags` is specified below DIPUEvent() = default; + DIPUEvent(const DIPUEvent&) = delete; + DIPUEvent& operator=(const DIPUEvent&) = delete; - // add flags in future - // DIPUEvent(unsigned int flags) : flags_{flags} {} + DIPUEvent(DIPUEvent&& other) noexcept + : index(other.index), event(other.event) { + other.unsafe_reset(); + } - // dipu do not support IpcEventHandle until now + DIPUEvent& operator=(DIPUEvent&& other) noexcept { + index = other.index; + event = other.event; + other.unsafe_reset(); + return *this; + } ~DIPUEvent() { - try { - if (isCreated()) { - DIPUGuard guard(device_index_); - devproxy::destroyEvent(event_); - } - } catch (...) { /* No throw */ + if (initialized()) { + DIPUGuard _(index); + devproxy::destroyEvent(event); } } - DIPUEvent(const DIPUEvent&) = delete; - DIPUEvent& operator=(const DIPUEvent&) = delete; - - DIPUEvent(DIPUEvent&& other) noexcept = default; - DIPUEvent& operator=(DIPUEvent&& other) noexcept = default; - - explicit operator deviceEvent_t() const { return rawevent(); } + explicit operator deviceEvent_t() const noexcept { return rawevent(); } - // aclrtEvent do not support Less than operator until now - - c10::optional device() const { - if (isCreated()) { - return at::Device(dipu::DIPU_DEVICE_TYPE, device_index_); + c10::optional device() const noexcept { + if (initialized()) { + return at::Device(dipu::DIPU_DEVICE_TYPE, index); } return {}; } - bool isCreated() const { return event_ != nullptr; } - c10::DeviceIndex device_index() const { return device_index_; } - deviceEvent_t rawevent() const { return event_; } + c10::DeviceIndex device_index() const { return index; } + deviceEvent_t rawevent() const { return event; } bool query() const { - if (!isCreated()) { - return true; + if (initialized()) { + DIPUGuard _(index); + return devproxy::getEventStatus(event) == devapis::EventStatus::READY; } - - DIPUGuard guard(device_index_); - return devproxy::getEventStatus(event_) == devapis::EventStatus::READY; + return true; } - void record() { record(getCurrentDIPUStream()); } - - void recordOnce(const DIPUStream& stream) { - if (!was_recorded_) { - record(stream); + void record(const DIPUStream& stream = getCurrentDIPUStream()) { + auto stream_index = stream.device_index(); + auto not_match = false; + { + DIPUGuard _(stream_index); + if (not initialized()) { + index = stream_index; + devproxy::createEvent(&event); + devproxy::recordEvent(event, stream.rawstream()); + } else if (index == stream_index) { + devproxy::recordEvent(event, stream.rawstream()); + } else { + not_match = true; + } } - } - void record(const DIPUStream& stream) { - if (!isCreated()) { - createEvent(stream.device_index()); - } - TORCH_CHECK(device_index_ == stream.device_index(), "Event device ", - device_index_, " does not match recording stream's device ", + TORCH_CHECK(not_match, "Event device ", index, + " does not match recording stream's device ", stream.device_index(), "."); - DIPUGuard guard(device_index_); - devproxy::recordEvent(event_, stream.rawstream()); - was_recorded_ = true; } - void wait(const DIPUStream& stream) { - if (isCreated()) { - DIPUGuard guard(stream.device_index()); - devproxy::streamWaitEvent(stream.rawstream(), event_); + void wait(const DIPUStream& stream) const { + if (initialized()) { + DIPUGuard _(stream.device_index()); + devproxy::streamWaitEvent(stream.rawstream(), event); } } float elapsed_time(const DIPUEvent& other) const { TORCH_CHECK( - isCreated() && other.isCreated(), + initialized() && other.initialized(), "Both events must be recorded before calculating elapsed time."); - float time_ms = 0; - devproxy::eventElapsedTime(&time_ms, event_, other.event_); - return time_ms; + + auto milliseconds = 0.F; + devproxy::eventElapsedTime(&milliseconds, event, other.event); + return milliseconds; } void synchronize() const { - if (isCreated()) { - devproxy::waitEvent(event_); + if (initialized()) { + devproxy::waitEvent(event); } } - // dipu do not support IpcEventHandle until now - private: - unsigned int flags_ = 0; - bool was_recorded_ = false; - c10::DeviceIndex device_index_ = -1; - deviceEvent_t event_ = nullptr; - - void createEvent(c10::DeviceIndex device_index) { - device_index_ = device_index; - DIPUGuard guard(device_index_); - devproxy::createEvent(&event_); - } + bool initialized() const noexcept { return event != nullptr; } + void unsafe_reset() noexcept { event = nullptr; } }; } // namespace dipu diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBFCachingAllocator.cpp b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBFCachingAllocator.cpp index d77c26788..8b9cbd7c3 100644 --- a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBFCachingAllocator.cpp +++ b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBFCachingAllocator.cpp @@ -7,6 +7,8 @@ #include #include +#include "csrc_dipu/runtime/core/DIPUEvent.h" + #include "DIPUCachingAllocator.h" namespace dipu { @@ -483,7 +485,7 @@ class BFCachingAllocator : public CacheAllocator { if (allocator_->impl) { if (ptr()) { allocator_->async_mem_pool()->put(std::make_tuple(ptr(), id_), - streams_to_events()); + listen_streams_ready()); allocator_->set_memory_allocated(allocator_->memory_allocated() - nbytes_); } diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBSCachingAllocator.cpp b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBSCachingAllocator.cpp index 02ab3bf28..cebb22275 100644 --- a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBSCachingAllocator.cpp +++ b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUBSCachingAllocator.cpp @@ -203,7 +203,7 @@ class BSCachingAllocator : public CacheAllocator { << ", size_:" << size()); if (allocator_->impl) { allocator_->async_mem_pool()->put(std::make_tuple(ptr(), size()), - streams_to_events()); + listen_streams_ready()); allocator_->set_memory_allocated(allocator_->memory_allocated() - real_size_); allocator_->flush_mem_pool(); diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUCachingAllocator.h b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUCachingAllocator.h index 3cdb5bfc0..012cc884c 100644 --- a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUCachingAllocator.h +++ b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUCachingAllocator.h @@ -1,13 +1,18 @@ // Copyright (c) 2023, DeepLink. #pragma once +#include +#include + #include #include #include +#include "csrc_dipu/runtime/core/DIPUEvent.h" + #include "DIPUCachingAllocatorUtils.h" #include "DIPURawAllocator.h" -#include "async_resource_pool.h" +#include "async_resource_queue.h" namespace dipu { @@ -42,7 +47,16 @@ const MemoryAlignmentStrategy* getMemoryAlignmentStrategy(); void setMemoryAlignmentStrategy( const MemoryAlignmentStrategy* memoryAlignStrategy); -using AsyncMemPool = AsyncResourceQueue>; +struct EventsListener { + std::vector> events; + bool operator()() const { + auto ready = [](auto& event) { return event->query(); }; + return std::all_of(events.begin(), events.end(), ready); + } +}; + +using AsyncMemPool = + AsyncResourceQueue, EventsListener>; class MemStats { private: @@ -148,13 +162,15 @@ class DIPU_API CacheAllocator : public c10::Allocator, public MemStats { ska::flat_hash_set& streams() { return streams_; } - std::vector streams_to_events() const { + EventsListener listen_streams_ready() const { auto index = std::size_t{}; - auto events = std::vector(streams_.size()); + auto events = std::vector>(streams_.size()); for (auto& stream : streams_) { - events[index++].record(stream); + events[index] = std::make_unique(); + events[index]->record(stream); + index++; } - return events; + return EventsListener{std::move(events)}; } const CacheAllocator* allocator() { return allocator_; } @@ -195,7 +211,8 @@ struct RawAllocator { using type = DIPURawHostAllocator; }; -template +template c10::Allocator* get_allocator_impl(c10::Allocator* raw_allocator) { // Construct when really needed // async_mem_pool is used when cache_allocator being destructed so it should @@ -210,12 +227,13 @@ c10::Allocator* get_allocator_impl(c10::Allocator* raw_allocator) { return &cache_allocator; } -template +template c10::Allocator* get_allocator(int device_id, c10::Allocator* raw_allocator) { -#define DIPU_ALLOCATOR_DISPATCH_DEVICE_ID(id) \ - if (device_id == (id)) { \ - return get_allocator_impl( \ - raw_allocator); \ +#define DIPU_ALLOCATOR_DISPATCH_DEVICE_ID(id) \ + if (device_id == (id)) { \ + return get_allocator_impl(raw_allocator); \ } DIPU_ALLOCATOR_DISPATCH_DEVICE_ID(0); @@ -242,10 +260,13 @@ c10::Allocator* get_allocator(int device_id, c10::Allocator* raw_allocator) { namespace name##device_type { \ static allocator_details::RawAllocator::type \ raw_allocator; \ - using AsyncMemPool = AsyncResourceQueue>; \ + using AsyncMemPool = \ + AsyncResourceQueue, dipu::EventsListener>; \ static const std::function allocator_get_fn = \ std::bind( \ - allocator_details::get_allocator, \ + allocator_details::get_allocator, \ std::placeholders::_1, &raw_allocator); \ static const allocator_details::AllocatorRegisterer g_allocator( \ #name, at::DeviceType::device_type, allocator_get_fn, priority); \ diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPURawCachingAllocator.cpp b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPURawCachingAllocator.cpp index 4a3e59cfa..020eed24d 100644 --- a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPURawCachingAllocator.cpp +++ b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPURawCachingAllocator.cpp @@ -21,7 +21,7 @@ class RawCachingAllocator : public CacheAllocator { ~Context() { auto alloc = static_cast(allocator()); alloc->async_mem_pool()->put(std::make_tuple(ptr(), size()), - streams_to_events()); + listen_streams_ready()); alloc->set_memory_allocated(alloc->memory_allocated() - real_size_); alloc->empty_cache(); } diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/async_resource_pool.h b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/async_resource_pool.h deleted file mode 100644 index 2fe450981..000000000 --- a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/async_resource_pool.h +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright (c) 2023, DeepLink. -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -#include "csrc_dipu/runtime/core/DIPUEvent.h" - -namespace dipu { - -constexpr size_t kMaxAsyncResourcePoolLength = 3; - -template -class AsyncResourceQueue { - using value_type = std::pair>; - - std::deque queue; - mutable std::mutex mutex; - std::atomic_size_t count{}; - - public: - void put(T&& item, std::vector&& events) { - std::scoped_lock _(mutex); - queue.emplace_back(std::move(item), std::move(events)); - count.store(queue.size(), std::memory_order_release); - } - - std::optional pop() { - if (empty()) { - return {}; - } - - std::scoped_lock _(mutex); - if (queue.empty()) { - return {}; - } - - auto& front = queue.front(); - for (auto& event : front.second) { - if (!event.query()) { - return {}; - } - } - - auto item = std::move(front.first); - queue.pop_front(); - count.store(queue.size(), std::memory_order_release); - return item; - } - - bool empty() const { return count.load(std::memory_order_acquire) != 0; } - std::size_t size() const { return count.load(std::memory_order_acquire); } -}; - -} // namespace dipu diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/async_resource_queue.h b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/async_resource_queue.h new file mode 100644 index 000000000..534d7efb3 --- /dev/null +++ b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/async_resource_queue.h @@ -0,0 +1,60 @@ +// Copyright (c) 2023, DeepLink. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace dipu { + +constexpr size_t kMaxAsyncResourcePoolLength = 3; + +template +class AsyncResourceQueue { + std::deque> queue; + std::mutex mutex; + std::atomic_size_t count{}; + + public: + auto put(T&& item, U&& ready) -> void { + std::scoped_lock _(mutex); + queue.emplace_back(std::move(item), std::move(ready)); + count.store(queue.size(), std::memory_order_release); + } + + auto pop() -> std::optional { + if (empty()) { + return {}; + } + + std::scoped_lock _(mutex); + if (queue.empty()) { + return {}; + } + + auto& [item, ready] = queue.front(); + if (not ready()) { + return {}; + } + + auto output = std::move(item); + queue.pop_front(); + count.store(queue.size(), std::memory_order_release); + return output; + } + + auto size() const -> std::size_t { + return count.load(std::memory_order_acquire); + } + + auto empty() const -> bool { + return count.load(std::memory_order_acquire) == 0; + } +}; + +} // namespace dipu From 8f9fab394c44ffe192e2658e3ea1af368ab81a39 Mon Sep 17 00:00:00 2001 From: wiryls <7984500+wiryls@users.noreply.github.com> Date: Wed, 27 Mar 2024 13:17:32 +0800 Subject: [PATCH 3/4] fix: error if not match --- .../csrc_dipu/runtime/core/DIPUEvent.h | 10 ++++----- .../core/allocator/DIPUCachingAllocator.h | 22 +++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/DIPUEvent.h b/dipu/torch_dipu/csrc_dipu/runtime/core/DIPUEvent.h index 341d98079..6adce2106 100644 --- a/dipu/torch_dipu/csrc_dipu/runtime/core/DIPUEvent.h +++ b/dipu/torch_dipu/csrc_dipu/runtime/core/DIPUEvent.h @@ -54,8 +54,8 @@ class DIPU_API DIPUEvent { return {}; } - c10::DeviceIndex device_index() const { return index; } - deviceEvent_t rawevent() const { return event; } + c10::DeviceIndex device_index() const noexcept { return index; } + deviceEvent_t rawevent() const noexcept { return event; } bool query() const { if (initialized()) { @@ -67,7 +67,7 @@ class DIPU_API DIPUEvent { void record(const DIPUStream& stream = getCurrentDIPUStream()) { auto stream_index = stream.device_index(); - auto not_match = false; + auto match = true; { DIPUGuard _(stream_index); if (not initialized()) { @@ -77,11 +77,11 @@ class DIPU_API DIPUEvent { } else if (index == stream_index) { devproxy::recordEvent(event, stream.rawstream()); } else { - not_match = true; + match = false; } } - TORCH_CHECK(not_match, "Event device ", index, + TORCH_CHECK(match, "Event device ", index, " does not match recording stream's device ", stream.device_index(), "."); } diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUCachingAllocator.h b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUCachingAllocator.h index 012cc884c..1e0a520d7 100644 --- a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUCachingAllocator.h +++ b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPUCachingAllocator.h @@ -47,16 +47,18 @@ const MemoryAlignmentStrategy* getMemoryAlignmentStrategy(); void setMemoryAlignmentStrategy( const MemoryAlignmentStrategy* memoryAlignStrategy); +namespace detail { struct EventsListener { - std::vector> events; + std::vector events; bool operator()() const { - auto ready = [](auto& event) { return event->query(); }; + auto ready = [](auto& event) { return event.query(); }; return std::all_of(events.begin(), events.end(), ready); } }; +} // namespace detail using AsyncMemPool = - AsyncResourceQueue, EventsListener>; + AsyncResourceQueue, detail::EventsListener>; class MemStats { private: @@ -162,15 +164,13 @@ class DIPU_API CacheAllocator : public c10::Allocator, public MemStats { ska::flat_hash_set& streams() { return streams_; } - EventsListener listen_streams_ready() const { + detail::EventsListener listen_streams_ready() const { auto index = std::size_t{}; - auto events = std::vector>(streams_.size()); + auto events = std::vector(streams_.size()); for (auto& stream : streams_) { - events[index] = std::make_unique(); - events[index]->record(stream); - index++; + events[index++].record(stream); } - return EventsListener{std::move(events)}; + return detail::EventsListener{std::move(events)}; } const CacheAllocator* allocator() { return allocator_; } @@ -260,8 +260,8 @@ c10::Allocator* get_allocator(int device_id, c10::Allocator* raw_allocator) { namespace name##device_type { \ static allocator_details::RawAllocator::type \ raw_allocator; \ - using AsyncMemPool = \ - AsyncResourceQueue, dipu::EventsListener>; \ + using AsyncMemPool = AsyncResourceQueue, \ + dipu::detail::EventsListener>; \ static const std::function allocator_get_fn = \ std::bind( \ allocator_details::get_allocator Date: Wed, 27 Mar 2024 18:55:08 +0800 Subject: [PATCH 4/4] fix: check if pool is empty --- .../core/allocator/DIPURawCachingAllocator.cpp | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPURawCachingAllocator.cpp b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPURawCachingAllocator.cpp index 020eed24d..647e97b62 100644 --- a/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPURawCachingAllocator.cpp +++ b/dipu/torch_dipu/csrc_dipu/runtime/core/allocator/DIPURawCachingAllocator.cpp @@ -48,12 +48,14 @@ class RawCachingAllocator : public CacheAllocator { void empty_cache() const override { DIPU_DEBUG_ALLOCATOR(8, "RawCachingAllocator: empty_cache"); - auto& pool = *async_mem_pool(); - for (auto item = pool.pop(); item; item = pool.pop()) { - auto [ptr, size] = item.value(); - auto nbytes = getAllocateSize(size); - raw_allocator()->raw_deallocate(ptr); - set_memory_reserved(memory_reserved() - nbytes); + for (auto& pool = *async_mem_pool(); not pool.empty();) { + std::this_thread::yield(); + for (auto item = pool.pop(); item; item = pool.pop()) { + auto [ptr, size] = item.value(); + auto nbytes = getAllocateSize(size); + raw_allocator()->raw_deallocate(ptr); + set_memory_reserved(memory_reserved() - nbytes); + } } }