Skip to content

Commit

Permalink
Add sync execution function to handle synchronization during monitori…
Browse files Browse the repository at this point in the history
…ng window operations
  • Loading branch information
sharonsyh committed Jan 9, 2025
1 parent 2276ac2 commit dea8b5d
Showing 1 changed file with 66 additions and 94 deletions.
160 changes: 66 additions & 94 deletions zeus/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from zeus.monitor.power import PowerMonitor
from zeus.monitor.energy import ZeusMonitor

from zeus.utils.framework import sync_execution as sync_execution_fn
from zeus.device.cpu import get_cpus


Expand All @@ -45,7 +45,8 @@ def begin_window(self, name: str, sync_execution: bool = True) -> None:
Args:
name (str): Name of the measurement window.
sync_execution (bool): Whether to execute synchronously. Defaults to None.
sync_execution (bool): Whether to wait for asynchronously dispatched computations
to finish before starting the measurement window.
"""
pass

Expand All @@ -55,7 +56,8 @@ def end_window(self, name: str, sync_execution: bool = True) -> None:
Args:
name (str): Name of the measurement window.
sync_execution (bool): Whether to execute synchronously. Defaults to None.
sync_execution (bool): Whether to wait for asynchronously dispatched computations
to finish before starting the measurement window.
"""
pass

Expand All @@ -64,25 +66,13 @@ class EnergyHistogram(Metric):
"""Measures the energy consumption a code range and exports a histogram metrics.
Tracks energy consumption for GPUs, CPUs, and DRAM as Prometheus Histogram metrics.
Attributes:
cpu_indices: List of CPU indices to monitor.
gpu_indices: List of GPU indices to monitor.
prometheus_url: Prometheus Push Gateway URL.
job: Prometheus job name.
gpu_bucket_range: Histogram buckets for GPU energy.
cpu_bucket_range: Histogram buckets for CPU energy.
dram_bucket_range: Histogram buckets for DRAM energy.
gpu_histograms: A single Prometheus Histogram metric for all GPU energy consumption, indexed by window and GPU index.
cpu_histograms: A single Prometheus Histogram metric for all CPU energy consumption, indexed by window and CPU index.
dram_histograms: A single Prometheus Histogram metric for all DRAM energy consumption, indexed by window and DRAM index.
"""

def __init__(
self,
cpu_indices: list,
gpu_indices: list,
prometheus_url: str,
pushgateway_url: str,
job: str,
gpu_bucket_range: Sequence[float] = [50.0, 100.0, 200.0, 500.0, 1000.0],
cpu_bucket_range: Sequence[float] = [10.0, 50.0, 100.0, 500.0, 1000.0],
Expand All @@ -96,7 +86,7 @@ def __init__(
Args:
cpu_indices (list): List of CPU indices to monitor.
gpu_indices (list): List of GPU indices to monitor.
prometheus_url (str): URL of the Prometheus Push Gateway where metrics will be pushed.
pushgateway_url (str): URL of the Prometheus Push Gateway where metrics will be pushed.
job (str): Name of the Prometheus job to associate with the energy metrics.
gpu_bucket_range (list[float], optional): Bucket ranges for GPU energy histograms.
Defaults to [50.0, 100.0, 200.0, 500.0, 1000.0].
Expand All @@ -113,7 +103,7 @@ def __init__(
self.dram_bucket_range = dram_bucket_range
self.cpu_indices = cpu_indices
self.gpu_indices = gpu_indices
self.prometheus_url = prometheus_url
self.pushgateway_url = pushgateway_url
self.job = job
self.registry = CollectorRegistry()

Expand Down Expand Up @@ -175,9 +165,14 @@ def begin_window(self, name: str, sync_execution: bool = True) -> None:
Args:
name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
sync_execution (bool): Whether to execute synchronously. Defaults to True.
sync_execution (bool): Whether to execute synchronously. Defaults to True. If assigned True, calls sync_execution_fn with the defined gpu
"""
self.energy_monitor.begin_window(f"__EnergyHistogram_{name}", sync_execution)
if sync_execution:
sync_execution_fn(self.gpu_indices)

self.energy_monitor.begin_window(
f"__EnergyHistogram_{name}", sync_execution=sync_execution
)

def end_window(self, name: str, sync_execution: bool = True) -> None:
"""End the current energy monitoring window and record the energy data.
Expand All @@ -188,14 +183,12 @@ def end_window(self, name: str, sync_execution: bool = True) -> None:
Args:
name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
sync_execution (bool): Whether to execute synchronously. Defaults to True.
Pushes:
- GPU energy data to the Prometheus Push Gateway via the associated Histogram metric.
- CPU energy data to the Prometheus Push Gateway via the associated Histogram metric.
- DRAM energy data to the Prometheus Push Gateway via the associated Histogram metric.
"""
if sync_execution:
sync_execution_fn(self.gpu_indices)

measurement = self.energy_monitor.end_window(
f"__EnergyHistogram_{name}", sync_execution
f"__EnergyHistogram_{name}", sync_execution=sync_execution
)

if measurement.gpu_energy:
Expand Down Expand Up @@ -242,11 +235,11 @@ def end_window(self, name: str, sync_execution: bool = True) -> None:
)
if dram_energy < self.min_dram_bucket:
warnings.warn(
f"CPU {dram_index} energy {dram_energy} exceeds the minimum bucket value of {self.min_dram_bucket}",
f"DRAM {dram_index} energy {dram_energy} exceeds the minimum bucket value of {self.min_dram_bucket}",
stacklevel=1,
)

push_to_gateway(self.prometheus_url, job=self.job, registry=self.registry)
push_to_gateway(self.pushgateway_url, job=self.job, registry=self.registry)


class EnergyCumulativeCounter(Metric):
Expand All @@ -257,26 +250,14 @@ class EnergyCumulativeCounter(Metric):
The cumulative nature of the Counter ensures that energy values are always incremented over time, never reset,
which is ideal for tracking continuously increasing values like energy usage.
Attributes:
energy_monitor: The ZeusMonitor instance that collects energy consumption data for the system.
update_period: The interval (in seconds) between consecutive energy data updates.
prometheus_url: The URL of the Prometheus Push Gateway where the Counter metrics will be pushed.
job: The name of the job associated with the energy monitoring in Prometheus.
gpu_counters: A single Prometheus Counter metric for all GPU energy consumption, indexed by window and GPU index.
cpu_counters: A single Prometheus Counter metric for all CPU energy consumption, indexed by window and CPU index.
dram_counters: A single Prometheus Counter metric for all DRAM energy consumption, indexed by window and DRAM index.
queue: A multiprocessing queue used to send signals to start/stop energy monitoring.
proc: A multiprocessing process that runs the energy monitoring loop.
window_state: A dictionary that maps the monitoring window names to their corresponding process state.
"""

def __init__(
self,
cpu_indices: list,
gpu_indices: list,
update_period: int,
prometheus_url: str,
pushgateway_url: str,
job: str,
) -> None:
"""Initialize the EnergyCumulativeCounter.
Expand All @@ -285,16 +266,14 @@ def __init__(
cpu_indices (list): List of CPU indices to monitor.
gpu_indices (list): List of GPU indices to monitor.
update_period: The time interval (in seconds) at which energy measurements are updated.
prometheus_url: The URL for the Prometheus Push Gateway where the metrics will be pushed.
pushgateway_url: The URL for the Prometheus Push Gateway where the metrics will be pushed.
job: The name of the job to be associated with the Prometheus metrics.
"""
self.cpu_indices = cpu_indices
self.gpu_indices = gpu_indices
self.update_period = update_period
self.prometheus_url = prometheus_url
self.pushgateway_url = pushgateway_url
self.job = job
self.queue = None
self.proc = None
self.window_state: dict[str, MonitoringProcessState] = {}

def begin_window(self, name: str, sync_execution: bool = False) -> None:
Expand All @@ -307,28 +286,28 @@ def begin_window(self, name: str, sync_execution: bool = False) -> None:
name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
sync_execution (bool, optional): Whether to execute monitoring synchronously. Defaults to False.
"""
if sync_execution:
sync_execution_fn(self.gpu_indices)

context = mp.get_context("spawn")
self.queue = context.Queue()
self.proc = context.Process(
queue = context.Queue()
proc = context.Process(
target=energy_monitoring_loop,
args=(
name,
self.queue,
queue,
self.cpu_indices,
self.gpu_indices,
self.update_period,
self.prometheus_url,
self.pushgateway_url,
self.job,
sync_execution,
),
)
self.proc.start()
if not self.proc.is_alive():
proc.start()
if not proc.is_alive():
raise RuntimeError(f"Failed to start monitoring process for {name}.")

self.window_state[name] = MonitoringProcessState(
queue=self.queue, proc=self.proc
)
self.window_state[name] = MonitoringProcessState(queue=queue, proc=proc)

def end_window(self, name: str, sync_execution: bool = False) -> None:
"""End the energy monitoring window.
Expand All @@ -340,11 +319,11 @@ def end_window(self, name: str, sync_execution: bool = False) -> None:
if name not in self.window_state:
raise ValueError(f"No active monitoring process found for '{name}'.")

if sync_execution:
sync_execution_fn(self.gpu_indices)

state = self.window_state.pop(name)
if self.queue is not None:
self.queue.put("stop")
else:
raise RuntimeError("Queue is not initialized")
state.queue.put("stop")
state.proc.join(timeout=20)

if state.proc.is_alive():
Expand All @@ -357,9 +336,8 @@ def energy_monitoring_loop(
cpu_indices: list,
gpu_indices: list,
update_period: int,
prometheus_url: str,
pushgateway_url: str,
job: str,
sync_execution: bool,
) -> None:
"""Runs in a separate process to collect and update energy consumption metrics (for GPUs, CPUs, and DRAM).
Expand All @@ -369,7 +347,7 @@ def energy_monitoring_loop(
cpu_indices (list): List of CPU indices to monitor.
gpu_indices (list): List of GPU indices to monitor.
update_period (int): The interval (in seconds) between consecutive energy data updates.
prometheus_url (str): The URL of the Prometheus Push Gateway where the metrics will be pushed.
pushgateway_url (str): The URL of the Prometheus Push Gateway where the metrics will be pushed.
job (str): The name of the Prometheus job associated with these metrics.
sync_execution (bool): Whether to execute monitoring synchronously.
"""
Expand Down Expand Up @@ -406,10 +384,12 @@ def energy_monitoring_loop(
if not pipe.empty():
break
# Begin and end monitoring window using sync_execution
energy_monitor.begin_window(f"__EnergyCumulativeCounter_{name}", sync_execution)
energy_monitor.begin_window(
f"__EnergyCumulativeCounter_{name}", sync_execution=False
)
time.sleep(update_period)
measurement = energy_monitor.end_window(
f"__EnergyCumulativeCounter_{name}", sync_execution
f"__EnergyCumulativeCounter_{name}", sync_execution=False
)

if measurement.gpu_energy:
Expand All @@ -427,7 +407,7 @@ def energy_monitoring_loop(
if dram_counters:
dram_counters.labels(window=name, index=dram_index).inc(energy)
# Push metrics to Prometheus
push_to_gateway(prometheus_url, job=job, registry=registry)
push_to_gateway(pushgateway_url, job=job, registry=registry)


class PowerGauge(Metric):
Expand All @@ -437,36 +417,26 @@ class PowerGauge(Metric):
The Gauge metric type is suitable for tracking values that can go up and down over time, like power consumption.
Power usage data is collected at regular intervals and pushed to a Prometheus Push Gateway for monitoring.
Attributes:
gpu_indices: List of GPU indices to monitor for power consumption.
update_period: Time interval (in seconds) between consecutive power measurements.
prometheus_url: URL of the Prometheus Push Gateway where Gauge metrics are pushed.
job: Name of the Prometheus job associated with the power metrics.
gpu_gauges: A single Prometheus Gauge metrics for real-time power consumption tracking.
queue: Queue for controlling the monitoring process.
proc: Process running the power monitoring loop.
window_state: A dictionary mapping monitoring window names to their process state.
"""

def __init__(
self,
gpu_indices: list,
update_period: int,
prometheus_url: str,
pushgateway_url: str,
job: str,
) -> None:
"""Initialize the PowerGauge metric.
Args:
gpu_indices (list[int]): List of GPU indices to monitor for power consumption.
update_period (int): Interval (in seconds) between consecutive power measurements.
prometheus_url (str): URL of the Prometheus Push Gateway where Gauge metrics are pushed.
pushgateway_url (str): URL of the Prometheus Push Gateway where Gauge metrics are pushed.
job (str): Name of the Prometheus job to associate with the power metrics.
"""
self.gpu_indices = gpu_indices
self.update_period = update_period
self.prometheus_url = prometheus_url
self.pushgateway_url = pushgateway_url
self.job = job
self.window_state: dict[str, MonitoringProcessState] = {}

Expand All @@ -484,28 +454,29 @@ def begin_window(self, name: str, sync_execution: bool = False) -> None:
if name in self.window_state:
raise ValueError(f"PowerGauge metric '{name}' already exists.")

if sync_execution:
sync_execution_fn(self.gpu_indices)

context = mp.get_context("spawn")
self.queue = context.Queue()
self.proc = context.Process(
queue = context.Queue()
proc = context.Process(
target=power_monitoring_loop,
args=(
name,
self.queue,
queue,
self.gpu_indices,
self.update_period,
self.prometheus_url,
self.pushgateway_url,
self.job,
),
)
self.proc.start()
if not self.proc.is_alive():
proc.start()
if not proc.is_alive():
raise RuntimeError(
f"Failed to start power monitoring process for '{name}'."
)

self.window_state[name] = MonitoringProcessState(
queue=self.queue, proc=self.proc
)
self.window_state[name] = MonitoringProcessState(queue=queue, proc=proc)

def end_window(self, name: str, sync_execution: bool = False) -> None:
"""End the power monitoring window.
Expand All @@ -514,12 +485,13 @@ def end_window(self, name: str, sync_execution: bool = False) -> None:
name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
sync_execution (bool, optional): Whether to execute monitoring synchronously. Defaults to False.
"""
if sync_execution:
sync_execution_fn(self.gpu_indices)

state = self.window_state.pop(name)
if self.queue is not None:
self.queue.put("stop")
else:
raise RuntimeError("Queue is not initialized")
state.queue.put("stop")
state.proc.join(timeout=20)

if state.proc.is_alive():
state.proc.terminate()

Expand All @@ -529,7 +501,7 @@ def power_monitoring_loop(
pipe: mp.Queue,
gpu_indices: list[int],
update_period: int,
prometheus_url: str,
pushgateway_url: str,
job: str,
) -> None:
"""Runs in a separate process and periodically collects power consumption data for each GPU and pushes the results to the Prometheus Push Gateway.
Expand All @@ -539,7 +511,7 @@ def power_monitoring_loop(
pipe (multiprocessing.Queue): Queue to receive control signals (e.g., "stop").
gpu_indices (list[int]): List of GPU indices to monitor for power consumption.
update_period (int): Interval (in seconds) between consecutive power data polls.
prometheus_url (str): URL of the Prometheus Push Gateway where metrics are pushed.
pushgateway_url (str): URL of the Prometheus Push Gateway where metrics are pushed.
job (str): Name of the Prometheus job to associate with the metrics.
"""
power_monitor = PowerMonitor(gpu_indices=gpu_indices)
Expand All @@ -566,7 +538,7 @@ def power_monitoring_loop(
print(f"Error during processing power measurement: {e}")

try:
push_to_gateway(prometheus_url, job=job, registry=registry)
push_to_gateway(pushgateway_url, job=job, registry=registry)
except Exception as e:
print(f"Error pushing metrics: {e}")

Expand Down

0 comments on commit dea8b5d

Please sign in to comment.