Skip to content

Commit

Permalink
[dipu] feat: add metrics collector for allocator (#815)
Browse files Browse the repository at this point in the history
* feat: add metrics collector for allocator

* refactor: rewrite metrics

* debug: avoid destruction

* fix: should not use moved value

* refactor: simplifiy and format some code

* feat: format, rename and add comments

* feat: sum for histogram and addsub for float

* fix: rename value to group

* refactor: rewrite some code, rename some classes and so son

* doc: add comments

* feat: add python interface and tests

* fix: test condition

* fix: python format

* feat: remove reset from group

* refactor: improve test and its message

* chore: debug test_metrics.py

* refactor: rename file and fix typo

* chore: add a missing extern

* fix: skip test if not bfc
  • Loading branch information
wiryls authored Jun 18, 2024
1 parent 7c5b571 commit 47076a6
Show file tree
Hide file tree
Showing 13 changed files with 1,288 additions and 4 deletions.
66 changes: 66 additions & 0 deletions dipu/tests/python/unittests/test_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright (c) 2023, DeepLink.
import os
import random
import torch
import torch_dipu
from collections.abc import Iterable
from torch_dipu.testing._internal.common_utils import TestCase, run_tests
from typing import Protocol


class MetricsGroup(Protocol):
name: str
type: str
help: str
values: list[
# labels, value | histogram ( thresholds, buckets, sum)
(list[(str, str)], int | float | tuple[list[int] | list[float], list[int], int])
]


def lookup(groups: Iterable[MetricsGroup], name: str, labels: list[(str, str)]):
item = next(x for x in groups if x.name == name)
keys = set(labels)
key, value = next((k, v) for k, v in item.values if keys.issubset(k))
return key, *value


def allocate_tensor(count: int) -> int:
total = 0
for _ in range(0, count):
nbytes = random.randrange(0, 100000)
total += nbytes
tensor = torch.empty(size=(nbytes,), dtype=torch.uint8, device="dipu")
return total


class TestMetrics(TestCase):
def test_bfc_allocator_metrics(self):
if os.environ.get("DIPU_DEVICE_MEMCACHING_ALGORITHM", "") != "BF":
return

allocate_tensor(1) # preheat

name = "allocator_size"
labels = [("type", "caching"), ("device", "0"), ("method", "allocate")]

last_label, _, last_bucket, last_size = lookup(
torch_dipu._C.metrics(), name, labels
)

expected_count = 100
expected_total = allocate_tensor(expected_count) # allocate

next_label, _, next_bucket, next_size = lookup(
torch_dipu._C.metrics(), name, labels
)
count = sum(next_bucket) - sum(last_bucket)
total = next_size - last_size

self.assertEqual(last_label, next_label)
self.assertEqual(expected_count, count, msg=f"{expected_count} == {count}")
self.assertLessEqual(expected_total, total, msg=f"{expected_total} <= {total}")


if __name__ == "__main__":
run_tests()
2 changes: 2 additions & 0 deletions dipu/torch_dipu/csrc_dipu/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ set(TORCH_DIPU_SOURCE
diopirt/diopirt_impl.cpp
diopirt/diopi_helper.cpp

metrics/metrics.cpp

profiler/collection.cpp
profiler/CorrelationIDManager.cpp
profiler/profiler.cpp
Expand Down
30 changes: 30 additions & 0 deletions dipu/torch_dipu/csrc_dipu/binding/ExportRT.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
// Copyright (c) 2023, DeepLink.
#include <sstream>
#include <tuple>
#include <utility>

#include <ATen/autocast_mode.h>
#include <c10/core/Device.h>
#include <torch/csrc/Device.h>
#include <torch/csrc/utils/pybind.h>

#include <pybind11/chrono.h>
#include <pybind11/detail/common.h>
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>

#include "csrc_dipu/aten/DIPUATenFunctions.h"
#include "csrc_dipu/base/DIPUGlobals.h"
#include "csrc_dipu/metrics/metrics.h"
#include "csrc_dipu/runtime/rthelper.h"
#include "csrc_dipu/utils/helpfunc.hpp"
#include "csrc_dipu/utils/vender_helper.hpp"
Expand Down Expand Up @@ -351,6 +357,29 @@ static void exportUtils(py::module& m) {
m.def("get_dipu_torch_version", []() -> int { return DIPU_TORCH_VERSION; });
}

static void exportMetrics(py::module& m) {
using group = metrics::ExportedGroup;

m.def("metrics", []() -> std::vector<group> {
return group::from_collector(metrics::default_collector());
});
m.def("is_metrics_enabled", []() -> bool { return metrics::enable(); });
m.def("enable_metrics", [](bool value) -> void { metrics::enable(value); });

py::class_<group>(m, "MetricsGroup")
.def(py::init<>())
.def_readwrite("name", &group::name)
.def_readwrite("type", &group::type)
.def_readwrite("info", &group::info)
.def_readwrite("values", &group::values)
.def("asdict", [](group const& x) -> py::dict {
// NOLINTNEXTLINE(google-build-using-namespace)
using namespace pybind11::literals;
return py::dict("name"_a = x.name, "type"_a = x.type, "info"_a = x.info,
"values"_a = x.values);
});
}

extern void patchTorchCsrcDevice(PyObject* module);

DIPU_API void exportDIPURuntime(PyObject* module) {
Expand All @@ -367,5 +396,6 @@ DIPU_API void exportDIPURuntime(PyObject* module) {
exportGenerator(m);
exportAutocast(m);
exportUtils(m);
exportMetrics(m);
}
} // namespace dipu
193 changes: 193 additions & 0 deletions dipu/torch_dipu/csrc_dipu/metrics/detail/collector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
#pragma once

#include <atomic>
#include <functional>
#include <memory>
#include <shared_mutex>
#include <type_traits>
#include <unordered_map>
#include <variant>

#include "label.h"

namespace dipu::metrics::detail {

// shared_value is internal wrapper for other value type (Counter, Gauge...)
// inside Group.
//
// Warning: shared_value is managed by Group, should not use directly by user.
template <typename T>
struct shared_value {
T value;
std::atomic_bool use{}; // TODO(cxx20): upgrade to std::atomic_flag
std::atomic_uint count{};

explicit shared_value(T const& source) noexcept : value(source) {}
auto incref() noexcept -> void { count.fetch_add(1); }
auto decref() noexcept -> void { count.fetch_sub(1); }
auto touch() noexcept -> T& {
use.store(true, std::memory_order_release);
return value;
}
auto unused() const noexcept -> bool {
return not use.load(std::memory_order_acquire) and count.load() == 0;
}
};

// group is managed by collector, and should not use directly by user.
template <typename S /* string type */, typename T /* value type */>
class group : public std::enable_shared_from_this<group<S, T>> {
S group_name;
S group_type;
S group_description;
T default_value;
std::shared_mutex mutable mutex;
std::unordered_map<labelset<S>, shared_value<T>> values;

public:
using value_type = typename decltype(values)::value_type;

template <typename... U>
explicit group(S name, S type, S help, U&&... args) noexcept
: group_name(std::move(name)), //
group_type(std::move(type)), //
group_description(std::move(help)), //
default_value(std::forward<U>(args)...) {}

auto name() const noexcept -> S const& { return group_name; }
auto type() const noexcept -> S const& { return group_type; }
auto description() const noexcept -> S const& { return group_description; }

template <typename U /* LabeledValue */>
[[nodiscard]] auto make(labelset<S> labels) -> U {
{ // return created if found
std::shared_lock _(mutex);
if (auto iter = values.find(labels); iter != values.end()) {
return U(this->shared_from_this(), *iter);
}
}
{ // return a newly created shared_value if not found
std::unique_lock _(mutex);
auto [iter, done] = values.try_emplace(std::move(labels), default_value);
return U(this->shared_from_this(), *iter);
}
}

template <typename F>
auto for_each(F f) -> void {
static_assert(std::is_invocable_v<F, labelset<S> const&, T&> ||
std::is_invocable_v<F, labelset<S> const&, T const&>);

auto found_unused = false;
{
std::shared_lock _(mutex);
for (auto& [key, value] : values) {
if (value.unused()) {
found_unused = true;
} else {
f(key, value.value);
}
}
}

if (found_unused) {
std::unique_lock _(mutex);
for (auto iter = values.begin(); iter != values.end();) {
if (iter->second.unused()) {
iter = values.erase(iter);
} else {
++iter;
}
}
}
}

[[nodiscard]] auto size() const -> std::size_t {
std::shared_lock _(mutex);
return values.size();
}
};

// collector should not be used directly. Please see Collector (outside
// detail namespace).
template <typename S, typename... V>
class collector {
public:
using string = S;
using variant = std::variant<std::shared_ptr<group<S, V>>...>;

private:
std::shared_mutex mutable mutex;
std::unordered_map<S, variant> groups;

public:
// In general, U should be LabeledValue.
template <typename U, typename T, typename... A>
[[nodiscard]] auto make(char const* hint, S name, A&&... args) -> U {
static_assert((std::is_same_v<T, V> or ...), "V must be one of {T, ...}");

if (name.empty()) {
auto m = std::string(hint) + " name cannot be empty";
throw std::invalid_argument(m);
}

// Find or create a named variant group.
auto& named = make_variant<T>(std::move(name), std::forward<A>(args)...);
auto* which = std::get_if<std::shared_ptr<group<S, T>>>(&named);

if (which == nullptr) {
auto m = std::string("expect type ") + hint + " but index is " +
std::to_string(named.index());
throw std::invalid_argument(m);
}

if (*which == nullptr) {
throw std::runtime_error("unexpected null shared pointer");
}

return (**which).template make<U>({});
}

[[nodiscard]] auto list() const
-> std::vector<std::reference_wrapper<variant const>> {
auto output = std::vector<std::reference_wrapper<variant const>>();
std::shared_lock _(mutex);
output.reserve(groups.size());
for (auto& [_, which] : groups) {
output.emplace_back(which);
}
return output;
}

[[nodiscard]] auto size() const -> std::size_t {
std::shared_lock _(mutex);
return groups.size();
}

private:
template <typename T, typename... A>
auto make_variant(S name, A&&... args) -> variant& {
// Return the variant if found, or insert a new variant into gourps.
{
std::shared_lock _(mutex);
if (auto iter = groups.find(name); iter != groups.end()) {
return iter->second;
}
}
{
// Avoid invoking std::make_shared if name already existed. Here we use a
// empty shared pointer as dummy.
using type = group<S, T>;
auto dummy = std::shared_ptr<type>();

std::unique_lock _(mutex);
auto [iter, inserted] = groups.try_emplace(name, dummy);
if (inserted) {
iter->second = std::make_shared<type>(name, std::forward<A>(args)...);
}
return iter->second;
}
}
};

} // namespace dipu::metrics::detail
Loading

0 comments on commit 47076a6

Please sign in to comment.