From dea8b5df3787f3787c59961d545403a4302df61f Mon Sep 17 00:00:00 2001 From: sharonsyh Date: Fri, 10 Jan 2025 00:47:41 +0900 Subject: [PATCH] Add sync execution function to handle synchronization during monitoring window operations --- zeus/metric.py | 160 ++++++++++++++++++++----------------------------- 1 file changed, 66 insertions(+), 94 deletions(-) diff --git a/zeus/metric.py b/zeus/metric.py index 14f38fa6..5555dbd0 100644 --- a/zeus/metric.py +++ b/zeus/metric.py @@ -20,7 +20,7 @@ from zeus.monitor.power import PowerMonitor from zeus.monitor.energy import ZeusMonitor - +from zeus.utils.framework import sync_execution as sync_execution_fn from zeus.device.cpu import get_cpus @@ -45,7 +45,8 @@ def begin_window(self, name: str, sync_execution: bool = True) -> None: Args: name (str): Name of the measurement window. - sync_execution (bool): Whether to execute synchronously. Defaults to None. + sync_execution (bool): Whether to wait for asynchronously dispatched computations + to finish before starting the measurement window. """ pass @@ -55,7 +56,8 @@ def end_window(self, name: str, sync_execution: bool = True) -> None: Args: name (str): Name of the measurement window. - sync_execution (bool): Whether to execute synchronously. Defaults to None. + sync_execution (bool): Whether to wait for asynchronously dispatched computations + to finish before starting the measurement window. """ pass @@ -64,25 +66,13 @@ class EnergyHistogram(Metric): """Measures the energy consumption a code range and exports a histogram metrics. Tracks energy consumption for GPUs, CPUs, and DRAM as Prometheus Histogram metrics. - - Attributes: - cpu_indices: List of CPU indices to monitor. - gpu_indices: List of GPU indices to monitor. - prometheus_url: Prometheus Push Gateway URL. - job: Prometheus job name. - gpu_bucket_range: Histogram buckets for GPU energy. - cpu_bucket_range: Histogram buckets for CPU energy. - dram_bucket_range: Histogram buckets for DRAM energy. - gpu_histograms: A single Prometheus Histogram metric for all GPU energy consumption, indexed by window and GPU index. - cpu_histograms: A single Prometheus Histogram metric for all CPU energy consumption, indexed by window and CPU index. - dram_histograms: A single Prometheus Histogram metric for all DRAM energy consumption, indexed by window and DRAM index. """ def __init__( self, cpu_indices: list, gpu_indices: list, - prometheus_url: str, + pushgateway_url: str, job: str, gpu_bucket_range: Sequence[float] = [50.0, 100.0, 200.0, 500.0, 1000.0], cpu_bucket_range: Sequence[float] = [10.0, 50.0, 100.0, 500.0, 1000.0], @@ -96,7 +86,7 @@ def __init__( Args: cpu_indices (list): List of CPU indices to monitor. gpu_indices (list): List of GPU indices to monitor. - prometheus_url (str): URL of the Prometheus Push Gateway where metrics will be pushed. + pushgateway_url (str): URL of the Prometheus Push Gateway where metrics will be pushed. job (str): Name of the Prometheus job to associate with the energy metrics. gpu_bucket_range (list[float], optional): Bucket ranges for GPU energy histograms. Defaults to [50.0, 100.0, 200.0, 500.0, 1000.0]. @@ -113,7 +103,7 @@ def __init__( self.dram_bucket_range = dram_bucket_range self.cpu_indices = cpu_indices self.gpu_indices = gpu_indices - self.prometheus_url = prometheus_url + self.pushgateway_url = pushgateway_url self.job = job self.registry = CollectorRegistry() @@ -175,9 +165,14 @@ def begin_window(self, name: str, sync_execution: bool = True) -> None: Args: name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'. - sync_execution (bool): Whether to execute synchronously. Defaults to True. + sync_execution (bool): Whether to execute synchronously. Defaults to True. If assigned True, calls sync_execution_fn with the defined gpu """ - self.energy_monitor.begin_window(f"__EnergyHistogram_{name}", sync_execution) + if sync_execution: + sync_execution_fn(self.gpu_indices) + + self.energy_monitor.begin_window( + f"__EnergyHistogram_{name}", sync_execution=sync_execution + ) def end_window(self, name: str, sync_execution: bool = True) -> None: """End the current energy monitoring window and record the energy data. @@ -188,14 +183,12 @@ def end_window(self, name: str, sync_execution: bool = True) -> None: Args: name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'. sync_execution (bool): Whether to execute synchronously. Defaults to True. - - Pushes: - - GPU energy data to the Prometheus Push Gateway via the associated Histogram metric. - - CPU energy data to the Prometheus Push Gateway via the associated Histogram metric. - - DRAM energy data to the Prometheus Push Gateway via the associated Histogram metric. """ + if sync_execution: + sync_execution_fn(self.gpu_indices) + measurement = self.energy_monitor.end_window( - f"__EnergyHistogram_{name}", sync_execution + f"__EnergyHistogram_{name}", sync_execution=sync_execution ) if measurement.gpu_energy: @@ -242,11 +235,11 @@ def end_window(self, name: str, sync_execution: bool = True) -> None: ) if dram_energy < self.min_dram_bucket: warnings.warn( - f"CPU {dram_index} energy {dram_energy} exceeds the minimum bucket value of {self.min_dram_bucket}", + f"DRAM {dram_index} energy {dram_energy} exceeds the minimum bucket value of {self.min_dram_bucket}", stacklevel=1, ) - push_to_gateway(self.prometheus_url, job=self.job, registry=self.registry) + push_to_gateway(self.pushgateway_url, job=self.job, registry=self.registry) class EnergyCumulativeCounter(Metric): @@ -257,18 +250,6 @@ class EnergyCumulativeCounter(Metric): The cumulative nature of the Counter ensures that energy values are always incremented over time, never reset, which is ideal for tracking continuously increasing values like energy usage. - - Attributes: - energy_monitor: The ZeusMonitor instance that collects energy consumption data for the system. - update_period: The interval (in seconds) between consecutive energy data updates. - prometheus_url: The URL of the Prometheus Push Gateway where the Counter metrics will be pushed. - job: The name of the job associated with the energy monitoring in Prometheus. - gpu_counters: A single Prometheus Counter metric for all GPU energy consumption, indexed by window and GPU index. - cpu_counters: A single Prometheus Counter metric for all CPU energy consumption, indexed by window and CPU index. - dram_counters: A single Prometheus Counter metric for all DRAM energy consumption, indexed by window and DRAM index. - queue: A multiprocessing queue used to send signals to start/stop energy monitoring. - proc: A multiprocessing process that runs the energy monitoring loop. - window_state: A dictionary that maps the monitoring window names to their corresponding process state. """ def __init__( @@ -276,7 +257,7 @@ def __init__( cpu_indices: list, gpu_indices: list, update_period: int, - prometheus_url: str, + pushgateway_url: str, job: str, ) -> None: """Initialize the EnergyCumulativeCounter. @@ -285,16 +266,14 @@ def __init__( cpu_indices (list): List of CPU indices to monitor. gpu_indices (list): List of GPU indices to monitor. update_period: The time interval (in seconds) at which energy measurements are updated. - prometheus_url: The URL for the Prometheus Push Gateway where the metrics will be pushed. + pushgateway_url: The URL for the Prometheus Push Gateway where the metrics will be pushed. job: The name of the job to be associated with the Prometheus metrics. """ self.cpu_indices = cpu_indices self.gpu_indices = gpu_indices self.update_period = update_period - self.prometheus_url = prometheus_url + self.pushgateway_url = pushgateway_url self.job = job - self.queue = None - self.proc = None self.window_state: dict[str, MonitoringProcessState] = {} def begin_window(self, name: str, sync_execution: bool = False) -> None: @@ -307,28 +286,28 @@ def begin_window(self, name: str, sync_execution: bool = False) -> None: name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'. sync_execution (bool, optional): Whether to execute monitoring synchronously. Defaults to False. """ + if sync_execution: + sync_execution_fn(self.gpu_indices) + context = mp.get_context("spawn") - self.queue = context.Queue() - self.proc = context.Process( + queue = context.Queue() + proc = context.Process( target=energy_monitoring_loop, args=( name, - self.queue, + queue, self.cpu_indices, self.gpu_indices, self.update_period, - self.prometheus_url, + self.pushgateway_url, self.job, - sync_execution, ), ) - self.proc.start() - if not self.proc.is_alive(): + proc.start() + if not proc.is_alive(): raise RuntimeError(f"Failed to start monitoring process for {name}.") - self.window_state[name] = MonitoringProcessState( - queue=self.queue, proc=self.proc - ) + self.window_state[name] = MonitoringProcessState(queue=queue, proc=proc) def end_window(self, name: str, sync_execution: bool = False) -> None: """End the energy monitoring window. @@ -340,11 +319,11 @@ def end_window(self, name: str, sync_execution: bool = False) -> None: if name not in self.window_state: raise ValueError(f"No active monitoring process found for '{name}'.") + if sync_execution: + sync_execution_fn(self.gpu_indices) + state = self.window_state.pop(name) - if self.queue is not None: - self.queue.put("stop") - else: - raise RuntimeError("Queue is not initialized") + state.queue.put("stop") state.proc.join(timeout=20) if state.proc.is_alive(): @@ -357,9 +336,8 @@ def energy_monitoring_loop( cpu_indices: list, gpu_indices: list, update_period: int, - prometheus_url: str, + pushgateway_url: str, job: str, - sync_execution: bool, ) -> None: """Runs in a separate process to collect and update energy consumption metrics (for GPUs, CPUs, and DRAM). @@ -369,7 +347,7 @@ def energy_monitoring_loop( cpu_indices (list): List of CPU indices to monitor. gpu_indices (list): List of GPU indices to monitor. update_period (int): The interval (in seconds) between consecutive energy data updates. - prometheus_url (str): The URL of the Prometheus Push Gateway where the metrics will be pushed. + pushgateway_url (str): The URL of the Prometheus Push Gateway where the metrics will be pushed. job (str): The name of the Prometheus job associated with these metrics. sync_execution (bool): Whether to execute monitoring synchronously. """ @@ -406,10 +384,12 @@ def energy_monitoring_loop( if not pipe.empty(): break # Begin and end monitoring window using sync_execution - energy_monitor.begin_window(f"__EnergyCumulativeCounter_{name}", sync_execution) + energy_monitor.begin_window( + f"__EnergyCumulativeCounter_{name}", sync_execution=False + ) time.sleep(update_period) measurement = energy_monitor.end_window( - f"__EnergyCumulativeCounter_{name}", sync_execution + f"__EnergyCumulativeCounter_{name}", sync_execution=False ) if measurement.gpu_energy: @@ -427,7 +407,7 @@ def energy_monitoring_loop( if dram_counters: dram_counters.labels(window=name, index=dram_index).inc(energy) # Push metrics to Prometheus - push_to_gateway(prometheus_url, job=job, registry=registry) + push_to_gateway(pushgateway_url, job=job, registry=registry) class PowerGauge(Metric): @@ -437,23 +417,13 @@ class PowerGauge(Metric): The Gauge metric type is suitable for tracking values that can go up and down over time, like power consumption. Power usage data is collected at regular intervals and pushed to a Prometheus Push Gateway for monitoring. - - Attributes: - gpu_indices: List of GPU indices to monitor for power consumption. - update_period: Time interval (in seconds) between consecutive power measurements. - prometheus_url: URL of the Prometheus Push Gateway where Gauge metrics are pushed. - job: Name of the Prometheus job associated with the power metrics. - gpu_gauges: A single Prometheus Gauge metrics for real-time power consumption tracking. - queue: Queue for controlling the monitoring process. - proc: Process running the power monitoring loop. - window_state: A dictionary mapping monitoring window names to their process state. """ def __init__( self, gpu_indices: list, update_period: int, - prometheus_url: str, + pushgateway_url: str, job: str, ) -> None: """Initialize the PowerGauge metric. @@ -461,12 +431,12 @@ def __init__( Args: gpu_indices (list[int]): List of GPU indices to monitor for power consumption. update_period (int): Interval (in seconds) between consecutive power measurements. - prometheus_url (str): URL of the Prometheus Push Gateway where Gauge metrics are pushed. + pushgateway_url (str): URL of the Prometheus Push Gateway where Gauge metrics are pushed. job (str): Name of the Prometheus job to associate with the power metrics. """ self.gpu_indices = gpu_indices self.update_period = update_period - self.prometheus_url = prometheus_url + self.pushgateway_url = pushgateway_url self.job = job self.window_state: dict[str, MonitoringProcessState] = {} @@ -484,28 +454,29 @@ def begin_window(self, name: str, sync_execution: bool = False) -> None: if name in self.window_state: raise ValueError(f"PowerGauge metric '{name}' already exists.") + if sync_execution: + sync_execution_fn(self.gpu_indices) + context = mp.get_context("spawn") - self.queue = context.Queue() - self.proc = context.Process( + queue = context.Queue() + proc = context.Process( target=power_monitoring_loop, args=( name, - self.queue, + queue, self.gpu_indices, self.update_period, - self.prometheus_url, + self.pushgateway_url, self.job, ), ) - self.proc.start() - if not self.proc.is_alive(): + proc.start() + if not proc.is_alive(): raise RuntimeError( f"Failed to start power monitoring process for '{name}'." ) - self.window_state[name] = MonitoringProcessState( - queue=self.queue, proc=self.proc - ) + self.window_state[name] = MonitoringProcessState(queue=queue, proc=proc) def end_window(self, name: str, sync_execution: bool = False) -> None: """End the power monitoring window. @@ -514,12 +485,13 @@ def end_window(self, name: str, sync_execution: bool = False) -> None: name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'. sync_execution (bool, optional): Whether to execute monitoring synchronously. Defaults to False. """ + if sync_execution: + sync_execution_fn(self.gpu_indices) + state = self.window_state.pop(name) - if self.queue is not None: - self.queue.put("stop") - else: - raise RuntimeError("Queue is not initialized") + state.queue.put("stop") state.proc.join(timeout=20) + if state.proc.is_alive(): state.proc.terminate() @@ -529,7 +501,7 @@ def power_monitoring_loop( pipe: mp.Queue, gpu_indices: list[int], update_period: int, - prometheus_url: str, + pushgateway_url: str, job: str, ) -> None: """Runs in a separate process and periodically collects power consumption data for each GPU and pushes the results to the Prometheus Push Gateway. @@ -539,7 +511,7 @@ def power_monitoring_loop( pipe (multiprocessing.Queue): Queue to receive control signals (e.g., "stop"). gpu_indices (list[int]): List of GPU indices to monitor for power consumption. update_period (int): Interval (in seconds) between consecutive power data polls. - prometheus_url (str): URL of the Prometheus Push Gateway where metrics are pushed. + pushgateway_url (str): URL of the Prometheus Push Gateway where metrics are pushed. job (str): Name of the Prometheus job to associate with the metrics. """ power_monitor = PowerMonitor(gpu_indices=gpu_indices) @@ -566,7 +538,7 @@ def power_monitoring_loop( print(f"Error during processing power measurement: {e}") try: - push_to_gateway(prometheus_url, job=job, registry=registry) + push_to_gateway(pushgateway_url, job=job, registry=registry) except Exception as e: print(f"Error pushing metrics: {e}")