diff --git a/.gitignore b/.gitignore index d1b3725cb..a589733f5 100644 --- a/.gitignore +++ b/.gitignore @@ -169,4 +169,5 @@ version.txt actions-runner/ experiments/ -examples/ \ No newline at end of file +examples/ +results/ \ No newline at end of file diff --git a/optimum_benchmark/backends/base.py b/optimum_benchmark/backends/base.py index 7ffcbf97d..32411e293 100644 --- a/optimum_benchmark/backends/base.py +++ b/optimum_benchmark/backends/base.py @@ -12,23 +12,19 @@ ClassVar, Dict, Generic, - List, Optional, Union, ) import numpy as np -import torch from optimum.exporters import TasksManager from transformers import AutoConfig, AutoProcessor if TYPE_CHECKING: - from datasets import Dataset from transformers import ( Pipeline, PretrainedConfig, PreTrainedModel, - TrainerCallback, TrainerState, ) from transformers.utils import ModelOutput @@ -37,10 +33,7 @@ from ..task_utils import DIFFUSION_TASKS, TEXT_GENERATION_TASKS from .config import BackendConfigT -from .isolation_utils import ( - only_this_process_is_running_on_cuda_devices, - only_this_process_will_run_on_cuda_devices, -) +from .isolation_utils import check_cuda_continuous_isolation from .utils import ( extract_shapes_from_diffusion_pipeline, extract_shapes_from_model_artifacts, @@ -48,38 +41,31 @@ LOGGER = getLogger("backend") -CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", None) -if CUDA_VISIBLE_DEVICES is not None: - CUDA_DEVICES = list(map(int, CUDA_VISIBLE_DEVICES.split(","))) -elif torch.cuda.is_available(): - CUDA_DEVICES = list(range(torch.cuda.device_count())) -else: - CUDA_DEVICES = [] - class Backend(Generic[BackendConfigT], ABC): NAME: ClassVar[str] - # instance variables without default values https://stackoverflow.com/a/44962662 + library: str + model_type: str config: BackendConfigT + isolation_thread: Optional[Process] pretrained_model: Union["PreTrainedModel", "Pipeline"] pretrained_processor: Optional["PreTrainedProcessor"] pretrained_config: Optional["PretrainedConfig"] + automodel_class: Callable[..., "PreTrainedModel"] def __init__(self, model: str, task: str, device: str, hub_kwargs: Dict[str, Any]): self.task = task self.model = model + self.device = device self.hub_kwargs = hub_kwargs - self.device = torch.device(device) if self.is_diffusion_pipeline(): - # for pipelines self.library = "diffusers" self.model_type = self.task self.pretrained_config = None self.pretrained_processor = None else: - # for models self.library = "transformers" self.pretrained_config = AutoConfig.from_pretrained( pretrained_model_name_or_path=self.model, **self.hub_kwargs @@ -87,8 +73,8 @@ def __init__(self, model: str, task: str, device: str, hub_kwargs: Dict[str, Any self.model_type = self.pretrained_config.model_type try: - # the processor sometimes contains information about the model's - # input shapes that's not available in the config + # sometimes contains information about the model's + # input shapes that're not available in the config self.pretrained_processor = AutoProcessor.from_pretrained( pretrained_model_name_or_path=self.model, **self.hub_kwargs ) @@ -98,7 +84,10 @@ def __init__(self, model: str, task: str, device: str, hub_kwargs: Dict[str, Any self.pretrained_processor = None self.automodel_class = TasksManager.get_model_class_for_task( - task=self.task, library=self.library, model_type=self.model_type + framework="pt", # TODO: make this configurable to add support for other frameworks + task=self.task, + library=self.library, + model_type=self.model_type, ) def is_text_generation_model(self) -> bool: @@ -112,71 +101,46 @@ def configure(self, config: BackendConfigT) -> None: self.config = config # isolation options - if self.config.initial_isolation_check: - self.check_initial_isolation() - if self.config.continous_isolation_check: + if self.config.continuous_isolation: + LOGGER.info("\t+ Checking continuous device(s) isolation") self.check_continuous_isolation() - # seeding backend - LOGGER.info(f"\t+ Seeding backend with seed {self.config.seed}") - self.seed() - # clean up options if self.config.delete_cache: LOGGER.info("\t+ Model cache will be deleted after benchmark") - def check_initial_isolation(self) -> None: - if self.device.type == "cuda": - LOGGER.info(f"\t+ Checking initial device(s) isolation of CUDA device(s): {CUDA_DEVICES}") - only_this_process_is_running_on_cuda_devices(cuda_devices=CUDA_DEVICES, benchmark_pid=os.getpid()) - def check_continuous_isolation(self) -> None: - if self.device.type == "cuda": - LOGGER.info(f"\t+ Checking continuous device(s) isolation of CUDA device(s): {CUDA_DEVICES}") + if self.device == "cuda": self.isolation_thread = Process( - target=only_this_process_will_run_on_cuda_devices, - kwargs={"cuda_devices": CUDA_DEVICES, "benchmark_pid": os.getpid()}, + target=check_cuda_continuous_isolation, + kwargs={"isolated_pid": os.getpid()}, daemon=True, ) self.isolation_thread.start() def seed(self) -> None: - # https://pytorch.org/docs/stable/notes/randomness.html random.seed(self.config.seed) np.random.seed(self.config.seed) - torch.manual_seed(self.config.seed) def prepare_input(self, input: Dict[str, Any]) -> Dict[str, Any]: if self.is_diffusion_pipeline(): - # diffusion pipelines takes a list of strings - return input + return input # diffusion pipelines takes a list of strings else: - # models expect tensors on the target device for key, value in input.items(): - input[key] = value.to(self.device) + input[key] = value.to(self.device) # models expect tensors on the target device return input def prepare_for_inference(self, **kwargs) -> None: pass - # # symbolic tracing in transformers requires input names - # def prepare_for_profiling(self, input_names: List[str]) -> Dict[str, Any]: - # pass - def forward(self, input: Dict[str, Any], kwargs: Dict[str, Any]) -> "ModelOutput": return self.pretrained_model(**input, **kwargs) def generate(self, input: Dict[str, Any], kwargs: Dict[str, Any]) -> "ModelOutput": return self.pretrained_model.generate(**input, **kwargs) - def train( - self, - training_dataset: "Dataset", - training_arguments: Dict[str, Any], - training_callbacks: List["TrainerCallback"], - training_data_collator: Callable, - ) -> "TrainerState": + def train(self, **kwargs) -> "TrainerState": raise NotImplementedError("Backend must implement train method") @property @@ -197,7 +161,7 @@ def delete_pretrained_model(self) -> None: if hasattr(self, "pretrained_model"): LOGGER.info("\t+ Deleting pretrained model") del self.pretrained_model - gc.collect() + gc.collect() def delete_model_cache(self) -> None: LOGGER.info("\t+ Deleting model cache") diff --git a/optimum_benchmark/backends/config.py b/optimum_benchmark/backends/config.py index 4943f3955..e7427c821 100644 --- a/optimum_benchmark/backends/config.py +++ b/optimum_benchmark/backends/config.py @@ -17,8 +17,7 @@ class BackendConfig(ABC): intra_op_num_threads: Optional[int] = None # isolation options - initial_isolation_check: bool = True - continous_isolation_check: bool = True + continuous_isolation: bool = True # clean up options delete_cache: bool = False diff --git a/optimum_benchmark/backends/ddp_utils.py b/optimum_benchmark/backends/ddp_utils.py deleted file mode 100644 index 4fa5cc747..000000000 --- a/optimum_benchmark/backends/ddp_utils.py +++ /dev/null @@ -1,97 +0,0 @@ -# TODO: this can be reformulated as a subclass of backend, from which pytorch and onnxruntime and any other backend - -import logging.config -import os -from logging import getLogger -from typing import TYPE_CHECKING, Optional - -from omegaconf import OmegaConf - -if TYPE_CHECKING: - from transformers import TrainerState - -from ..import_utils import is_torch_distributed_available - -# from launchConfig in https://github.com/pytorch/pytorch/blob/v2.0.0/torch/distributed/launcher/api.py#L29 adjusted -# to defaults of torch.distributed.run in https://github.com/pytorch/pytorch/blob/v2.0.0/torch/distributed/run.py#L770 -DDP_CONFIG = { - "min_nodes": 1, - "max_nodes": 1, - "run_id": "none", - "nproc_per_node": "${device_count:}", - "role": "default", - "rdzv_endpoint": "127.0.0.1:29500", - "rdzv_backend": "static", - "rdzv_configs": { - "timeout": 900, - "rank": 0, - }, - "max_restarts": 0, - "monitor_interval": 5, - "start_method": "spawn", - "log_dir": None, - "metrics_cfg": {}, - "local_addr": None, -} - - -def get_worker_logger(name: Optional[str] = None, log_all: bool = False) -> logging.Logger: - """ - PyTorch DDP subprocesses do not inherit from Hydra logger. - Thus, we need to reconfigure the logger for the workers. - """ - if os.environ["RANK"] == "0" or log_all: - # TODO: also configure logging for other ranks - hydra_conf = OmegaConf.load(".hydra/hydra.yaml") - logging.config.dictConfig(OmegaConf.to_container(hydra_conf.hydra.job_logging, resolve=True)) - - return getLogger(name) - - -def training_worker(args) -> "TrainerState": - dataset_format = args[0] - backend_logger = args[1] - trainer_class = args[2] - training_arguments_class = args[3] - use_ddp = args[4] - training_dataset = args[5] - training_arguments = args[6] - training_data_collator = args[7] - training_callbacks = args[8] - pretrained_model = args[9] - - if use_ddp: - LOGGER_WORKER = get_worker_logger("pytorch-ddp-worker", log_all=False) - env_variables = ["RANK", "WORLD_SIZE", "MASTER_ADDR", "MASTER_PORT", "TORCHELASTIC_MAX_RESTARTS"] - LOGGER_WORKER.info("Initializing DDP worker") - for env_var in env_variables: - LOGGER_WORKER.info(f"{env_var}: {os.environ.get(env_var)}") - else: - LOGGER_WORKER = backend_logger - - LOGGER_WORKER.info(f"\t+ Setting dataset format to `{dataset_format}`.") - training_dataset.set_format(type=dataset_format, columns=list(training_dataset.features.keys())) - LOGGER_WORKER.info("\t+ Wrapping training arguments with transformers.TrainingArguments") - training_arguments = training_arguments_class(**training_arguments) - LOGGER_WORKER.info("\t+ Wrapping model with transformers.Trainer") - trainer = trainer_class( - model=pretrained_model, - args=training_arguments, - callbacks=training_callbacks, - train_dataset=training_dataset, - data_collator=training_data_collator, - ) - LOGGER_WORKER.info("\t+ Starting training") - trainer.train() - LOGGER_WORKER.info("\t+ Training finished successfully") - return trainer.state - - -# a conditional decorator that is only applied if torch.distributed.elastic.multiprocessing.errors.record is available -def record_if_available(func): - if is_torch_distributed_available(): - from torch.distributed.elastic.multiprocessing.errors import record - - return record(func) - else: - return func diff --git a/optimum_benchmark/backends/isolation_utils.py b/optimum_benchmark/backends/isolation_utils.py index 80236e3e9..dca655e94 100644 --- a/optimum_benchmark/backends/isolation_utils.py +++ b/optimum_benchmark/backends/isolation_utils.py @@ -7,12 +7,12 @@ from ..import_utils import is_amdsmi_available, is_py3nvml_available, torch_version -def only_this_process_is_running_on_cuda_devices(cuda_devices: List[int], benchmark_pid: int) -> None: +def check_cuda_isolation(devices_ids: List[int], isolated_pid: int) -> None: """ Raises a RuntimeError if any process other than the benchmark process is running on the specified CUDA devices. """ pids: Dict[int, set] = {} - for device_id in cuda_devices: + for device_id in devices_ids: pids[device_id] = set() if is_nvidia_system(): @@ -24,7 +24,7 @@ def only_this_process_is_running_on_cuda_devices(cuda_devices: List[int], benchm import py3nvml.py3nvml as nvml nvml.nvmlInit() - for device_id in cuda_devices: + for device_id in devices_ids: device_handle = nvml.nvmlDeviceGetHandleByIndex(device_id) device_processes = nvml.nvmlDeviceGetComputeRunningProcesses(device_handle) for device_process in device_processes: @@ -46,7 +46,7 @@ def only_this_process_is_running_on_cuda_devices(cuda_devices: List[int], benchm if rocm_version >= "5.7": # starting from rocm 5.7, the api seems to have changed names devices_handles = smi.amdsmi_get_processor_handles() - for device_id in cuda_devices: + for device_id in devices_ids: device_handle = devices_handles[device_id] processes_handles = smi.amdsmi_get_gpu_process_list(device_handle) for process_handle in processes_handles: @@ -56,7 +56,7 @@ def only_this_process_is_running_on_cuda_devices(cuda_devices: List[int], benchm pids[device_id].add(info["pid"]) else: devices_handles = smi.amdsmi_get_device_handles() - for device_id in cuda_devices: + for device_id in devices_ids: device_handle = devices_handles[device_id] processes_handles = smi.amdsmi_get_process_list(device_handle) for process_handle in processes_handles: @@ -70,43 +70,27 @@ def only_this_process_is_running_on_cuda_devices(cuda_devices: List[int], benchm raise ValueError("check_no_process_is_running_on_cuda_device is only supported on NVIDIA and AMD GPUs.") all_pids = set() - for device_id in cuda_devices: + for device_id in devices_ids: all_pids |= pids[device_id] - other_pids = all_pids - {benchmark_pid} + other_pids = all_pids - {isolated_pid} if len(other_pids) > 0: - error_message = f"Expected only process {benchmark_pid} on device(s) {cuda_devices}, but found {other_pids}." - # for pid in other_pids: - # error_message += f"\nProcess {pid} info: {get_pid_info(pid)}" + error_message = f"Expected only process {isolated_pid} on device(s) {devices_ids}, but found {other_pids}." raise RuntimeError(error_message) -def only_this_process_will_run_on_cuda_devices(cuda_devices: List[int], benchmark_pid: int) -> None: +def check_cuda_continuous_isolation(devices_ids: List[int], isolated_pid: int) -> None: """ Kills the benchmark process if any other process is running on the specified CUDA devices. """ + + CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", None) + devices_ids = [int(device_id) for device_id in CUDA_VISIBLE_DEVICES.split(",")] + while True: try: - only_this_process_is_running_on_cuda_devices(cuda_devices, benchmark_pid) + check_cuda_isolation(devices_ids, isolated_pid) time.sleep(0.1) except Exception as exception: - os.kill(benchmark_pid, signal.SIGTERM) + os.kill(isolated_pid, signal.SIGTERM) raise exception - - -## we can report more information about the process to explain the source of the error -## but that might be dangerous in a CI context - -# import psutil - -# def get_pid_info(pid: int) -> Dict[str, str]: -# """Returns a dictionary containing the process' information.""" - -# process = psutil.Process(pid) - -# return { -# "pid": pid, -# "name": process.name(), -# "username": process.username(), -# "cmdline": " ".join(process.cmdline()), -# } diff --git a/optimum_benchmark/backends/neural_compressor/backend.py b/optimum_benchmark/backends/neural_compressor/backend.py index 7d76f1380..ca8ee1edd 100644 --- a/optimum_benchmark/backends/neural_compressor/backend.py +++ b/optimum_benchmark/backends/neural_compressor/backend.py @@ -31,8 +31,8 @@ def __init__(self, model: str, task: str, device: str, hub_kwargs: Dict[str, Any ) def validate_device(self) -> None: - if self.device.type != "cpu": - raise ValueError(f"INCBackend only supports CPU devices, got {self.device.type}") + if self.device != "cpu": + raise ValueError(f"INCBackend only supports CPU devices, got {self.device}") def validate_task(self) -> None: if self.task not in TASKS_TO_INCMODELS: diff --git a/optimum_benchmark/backends/onnxruntime/backend.py b/optimum_benchmark/backends/onnxruntime/backend.py index 84b1c759c..a39afd463 100644 --- a/optimum_benchmark/backends/onnxruntime/backend.py +++ b/optimum_benchmark/backends/onnxruntime/backend.py @@ -29,7 +29,6 @@ from ...profilers.ort_profiler import ORTProfilingWrapper from ..base import Backend -from ..ddp_utils import record_if_available, training_worker from ..optimum_utils import main_export from ..pytorch.utils import randomize_weights from .config import ORTConfig @@ -46,17 +45,9 @@ def __init__(self, model: str, task: str, device: str, hub_kwargs: Dict[str, Any self.validate_device() self.validate_task() - if self.is_diffusion_pipeline(): - self.ortmodel_class = get_class(TASKS_TO_ORTSD[self.task]) - elif self.task in TASKS_TO_ORTMODELS: - self.ortmodel_class = TASKS_TO_ORTMODELS[self.task] - - ortmodel_name = self.ortmodel_class.__name__ - LOGGER.info(f"Inferred ORTModel class {ortmodel_name} for task {self.task} and model_type {self.model_type}") - def validate_device(self) -> None: - if self.device.type not in ["cpu", "cuda"]: - raise ValueError(f"ORTBackend only supports CPU and CUDA devices, got {self.device.type}") + if self.device not in ["cpu", "cuda"]: + raise ValueError(f"ORTBackend only supports CPU and CUDA devices, got {self.device}") def validate_task(self) -> None: if self.task not in TASKS_TO_ORTMODELS and self.task not in TASKS_TO_ORTSD: @@ -65,6 +56,14 @@ def validate_task(self) -> None: def configure(self, config: ORTConfig) -> None: super().configure(config) + if self.is_diffusion_pipeline(): + self.ortmodel_class = get_class(TASKS_TO_ORTSD[self.task]) + elif self.task in TASKS_TO_ORTMODELS: + self.ortmodel_class = TASKS_TO_ORTMODELS[self.task] + + ortmodel_name = self.ortmodel_class.__name__ + LOGGER.info(f"Inferred ORTModel class {ortmodel_name} for task {self.task} and model_type {self.model_type}") + # Process torch dtype self.torch_dtype = getattr(torch, self.config.torch_dtype) if self.config.torch_dtype is not None else None @@ -149,12 +148,11 @@ def load_automodel_from_config(self) -> None: def load_automodel_from_pretrained(self) -> None: LOGGER.info("\t+ Loading AutoModel from pretrained") - with self.device: - self.pretrained_model = self.automodel_class.from_pretrained( - self.model, - torch_dtype=self.torch_dtype, - **self.hub_kwargs, - ) + self.pretrained_model = self.automodel_class.from_pretrained( + self.model, + torch_dtype=self.torch_dtype, + **self.hub_kwargs, + ).to(self.device) def load_ortmodel(self) -> None: LOGGER.info("\t+ Loading ORTModel") @@ -189,7 +187,7 @@ def export_automodel(self) -> None: self.model, output=exported_model_dir, task=self.export_task, - device=self.device.type, + device=self.device, fp16=self.torch_dtype == torch.float16, **self.hub_kwargs, # we hijack the model instantiation and use our random weights model @@ -213,12 +211,12 @@ def optimize_onnx_files(self) -> None: if self.config.auto_optimization is not None: optimization_config = AutoOptimizationConfig.with_optimization_level( optimization_level=self.config.auto_optimization, - for_gpu=self.device.type == "cuda", + for_gpu=self.device == "cuda", **self.config.auto_optimization_config, ) elif self.config.optimization: optimization_config = OptimizationConfig( - optimize_for_gpu=self.device.type == "cuda", **self.config.optimization_config + optimize_for_gpu=self.device == "cuda", **self.config.optimization_config ) LOGGER.info("\t+ Creating optimizer") optimizer = ORTOptimizer.from_pretrained(self.model, file_names=self.onnx_files_names) @@ -277,7 +275,7 @@ def quantize_onnx_files(self) -> None: dataset=calibration_dataset, calibration_config=calibration_config, operators_to_quantize=quantization_config.operators_to_quantize, - use_gpu=self.device.type == "cuda", + use_gpu=self.device == "cuda", # TODO: add support for these batch_size=1, use_external_data_format=False, @@ -332,39 +330,31 @@ def prepare_for_profiling(self, input_names: List[str]) -> None: LOGGER.info("\t+ Wrapping model inside profiler") self.pretrained_model = ORTProfilingWrapper(self.pretrained_model) - @record_if_available def train( self, training_dataset: "Dataset", training_arguments: Dict[str, Any], training_callbacks: List["TrainerCallback"], training_data_collator: Callable, + dataset_format: str = "torch", ) -> "TrainerState": - worker_args = ( - "torch", - LOGGER, - ORTTrainer, - ORTTrainingArguments, - self.config.use_ddp, - training_dataset, - training_arguments, - training_data_collator, - training_callbacks, - self.pretrained_model, + LOGGER.info(f"\t+ Setting dataset format to `{dataset_format}`.") + training_dataset.set_format(type=dataset_format, columns=list(training_dataset.features.keys())) + LOGGER.info("\t+ Wrapping training arguments with optimum.onnxruntime.ORTTrainingArguments") + training_arguments = ORTTrainingArguments(**training_arguments) + LOGGER.info("\t+ Wrapping model with optimum.onnxruntime.ORTTrainer") + trainer = ORTTrainer( + model=self.pretrained_model, + args=training_arguments, + callbacks=training_callbacks, + train_dataset=training_dataset, + data_collator=training_data_collator, ) + LOGGER.info("\t+ Starting training") + trainer.train() + LOGGER.info("\t+ Training finished successfully") - if self.config.use_ddp: - from torch.distributed.launcher.api import LaunchConfig, elastic_launch - - # For DDP, we log only the state of the first rank as transformers does. - # since the batch size used in measuring the throughput is the one of world size. - ddp_config = LaunchConfig(**self.config.ddp_config) - results = elastic_launch(config=ddp_config, entrypoint=training_worker)(worker_args)[0] - else: - # For DP, we can still use training_worker, simply not wrapped by the elastic_launch class. - results = training_worker(worker_args) - - return results + return trainer.state def clean(self) -> None: super().clean() @@ -372,6 +362,7 @@ def clean(self) -> None: if hasattr(self, "tmpdir"): self.tmpdir.cleanup() - if self.device.type == "cuda": + if self.device == "cuda": torch.cuda.empty_cache() - gc.collect() + + gc.collect() diff --git a/optimum_benchmark/backends/onnxruntime/config.py b/optimum_benchmark/backends/onnxruntime/config.py index 7ea41d3ed..10c7b79fd 100644 --- a/optimum_benchmark/backends/onnxruntime/config.py +++ b/optimum_benchmark/backends/onnxruntime/config.py @@ -6,41 +6,10 @@ from ...import_utils import onnxruntime_version from ..config import BackendConfig -from ..ddp_utils import DDP_CONFIG from ..peft_utils import PEFT_CONFIGS, PEFT_TASKS_TYPES - -def infer_device_id(device: str) -> int: - """Infer the device id from the given device string.""" - if "cuda" in device: - if ":" in device: - # either CUDA_VISIBLE_DEVICES is set or device is set to cuda:0 - return int(device.split(":")[1]) - else: - # device is set to cuda - return 0 - elif device == "cpu": - return -1 - else: - raise ValueError(f"Unknown device: {device}") - - -DEVICE_PROVIDER_MAP = { - "cpu": "CPUExecutionProvider", - "cuda": "CUDAExecutionProvider", -} - -OmegaConf.register_new_resolver("onnxruntime_version", onnxruntime_version) -OmegaConf.register_new_resolver("infer_device_id", lambda device: infer_device_id(device)) -OmegaConf.register_new_resolver("infer_provider", lambda device: DEVICE_PROVIDER_MAP[device]) -OmegaConf.register_new_resolver("is_profiling", lambda benchmark_name: benchmark_name == "profiling") -OmegaConf.register_new_resolver( - "io_bind", lambda provider: provider in ["CPUExecutionProvider", "CUDAExecutionProvider"] -) - - OPTIMIZATION_CONFIG = { - "optimization_level": 1, # 0, 1, 2, 99 + "optimization_level": 1, "fp16": False, "enable_transformers_specific_optimizations": True, "enable_gelu_approximation": False, @@ -100,6 +69,14 @@ def infer_device_id(device: str) -> int: "trt_engine_cache_path": "tmp/trt_cache", } +DEVICE_PROVIDER_MAP = {"cpu": "CPUExecutionProvider", "cuda": "CUDAExecutionProvider"} +IO_BINDING_PROVIDERS = ["CPUExecutionProvider", "CUDAExecutionProvider"] + +OmegaConf.register_new_resolver("onnxruntime_version", onnxruntime_version) +OmegaConf.register_new_resolver("infer_provider", lambda device: DEVICE_PROVIDER_MAP[device]) +OmegaConf.register_new_resolver("is_profiling", lambda benchmark_name: benchmark_name == "profiling") +OmegaConf.register_new_resolver("ort_io_binding", lambda provider: provider in IO_BINDING_PROVIDERS) + @dataclass class ORTConfig(BackendConfig): @@ -117,10 +94,10 @@ class ORTConfig(BackendConfig): # provider options provider: str = "${infer_provider:${device}}" - provider_options: Dict[str, Any] = field(default_factory=lambda: {"device_id": "${infer_device_id:${device}}"}) + provider_options: Dict[str, Any] = field(default_factory=lambda: {}) # inference options - use_io_binding: bool = "${io_bind:${device}}" + use_io_binding: bool = "${ort_io_binding:${device}}" session_options: Dict[str, Any] = field( default_factory=lambda: {"enable_profiling": "${is_profiling:${benchmark.name}}"} ) @@ -148,17 +125,11 @@ class ORTConfig(BackendConfig): # ort-training is basically a different package so we might need to separate these two backends in the future use_inference_session: bool = "${is_inference:${benchmark.name}}" - # training options - use_ddp: bool = False - ddp_config: Dict[str, Any] = field(default_factory=dict) - # peft options peft_strategy: Optional[str] = None peft_config: Dict[str, Any] = field(default_factory=dict) def __post_init__(self): - CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", None) - if not self.no_weights and not self.export and self.torch_dtype is not None: raise NotImplementedError("Can't convert an exported model's weights to a different dtype.") @@ -196,15 +167,6 @@ def __post_init__(self): if self.calibration: self.calibration_config = OmegaConf.to_object(OmegaConf.merge(CALIBRATION_CONFIG, self.calibration_config)) - if self.use_ddp: - if CUDA_VISIBLE_DEVICES is None: - raise ValueError("`use_ddp` can only be used when CUDA_VISIBLE_DEVICES is set.") - - self.ddp_config = OmegaConf.to_object(OmegaConf.merge(DDP_CONFIG, self.ddp_config)) - # TODO: check if it's not possible to use DDP with multiple nodes - if self.ddp_config["max_nodes"] > 1 or self.ddp_config["min_nodes"] > 1: - raise NotImplementedError("Currently, PyTorch DDP benchmark only supports training on a single node.") - if self.peft_strategy is not None: if self.peft_strategy not in PEFT_CONFIGS: raise ValueError( diff --git a/optimum_benchmark/backends/openvino/backend.py b/optimum_benchmark/backends/openvino/backend.py index f118544c3..fc88559ab 100644 --- a/optimum_benchmark/backends/openvino/backend.py +++ b/optimum_benchmark/backends/openvino/backend.py @@ -23,21 +23,23 @@ def __init__(self, model: str, task: str, device: str, hub_kwargs: Dict[str, Any self.validate_device() self.validate_task() - self.ovmodel_class = get_class(TASKS_TO_OVMODEL[self.task]) - ortmodel_name = self.ovmodel_class.__name__ - LOGGER.info(f"Inferred OVModel class {ortmodel_name} for task {self.task} and model_type {self.model_type}") - def validate_task(self) -> None: if self.task not in TASKS_TO_OVMODEL: raise NotImplementedError(f"OVBackend does not support task {self.task}") def validate_device(self) -> None: - if self.device.type != "cpu": - raise ValueError(f"OVBackend only supports CPU devices, got {self.device.type}") + if self.device != "cpu": + raise ValueError(f"OVBackend only supports CPU devices, got {self.device}") def configure(self, config: OVConfig) -> None: super().configure(config) + self.ovmodel_class = get_class(TASKS_TO_OVMODEL[self.task]) + ortmodel_name = self.ovmodel_class.__name__ + LOGGER.info( + f"\t+ Inferred OVModel class {ortmodel_name} for task {self.task} and model_type {self.model_type}" + ) + self.openvino_config = self.config.openvino_config.copy() if self.config.inter_op_num_threads is not None: LOGGER.info(f"\t+ Setting inter_op_num_threads to {self.config.inter_op_num_threads}") diff --git a/optimum_benchmark/backends/pytorch/backend.py b/optimum_benchmark/backends/pytorch/backend.py index 20aaec108..866dd6e12 100644 --- a/optimum_benchmark/backends/pytorch/backend.py +++ b/optimum_benchmark/backends/pytorch/backend.py @@ -1,28 +1,24 @@ -import gc -import logging +import os from logging import getLogger from typing import TYPE_CHECKING, Any, Callable, Dict, List import torch -from transformers.utils.fx import symbolic_trace + +if torch.distributed.is_available(): + import torch.distributed if TYPE_CHECKING: from datasets import Dataset from transformers import TrainerCallback, TrainerState from transformers.utils import ModelOutput -from ...profilers.fx_profiler import FXProfilingWrapper from ..base import Backend -from ..ddp_utils import record_if_available, training_worker from .config import PyTorchConfig from .utils import DTYPES_MAPPING, randomize_weights, to_pow2 # bachend logger LOGGER = getLogger("pytorch") -# disable numexpr.utils logger -getLogger("numexpr.utils").setLevel(logging.CRITICAL) - class PyTorchBackend(Backend[PyTorchConfig]): NAME: str = "pytorch" @@ -30,12 +26,18 @@ class PyTorchBackend(Backend[PyTorchConfig]): def __init__(self, model: str, task: str, device: str, hub_kwargs: Dict[str, Any]): super().__init__(model, task, device, hub_kwargs) - automodel = self.automodel_class.__name__ - LOGGER.info(f"Inferred AutoModel class {automodel} for task {self.task} and model_type {self.model_type}") - def configure(self, config: PyTorchConfig) -> None: super().configure(config) + automodel = self.automodel_class.__name__ + LOGGER.info(f"\t+ Inferred AutoModel class {automodel} for task {self.task} and model_type {self.model_type}") + + # for now we rely on this env variable to know if we're in a distributed setting + if os.environ.get("WORLD_SIZE", None) is not None: + LOGGER.info(f"\t+ Detected world size: {os.environ['WORLD_SIZE']}") + LOGGER.info(f"\t+ Setting device to its corresponding local rank: {os.environ['LOCAL_RANK']}") + torch.cuda.set_device(int(os.environ.get("LOCAL_RANK", None))) + # Gradients options if self.config.disable_grad: LOGGER.info("\t+ Disabling gradients") @@ -98,6 +100,14 @@ def configure(self, config: PyTorchConfig) -> None: peft_config = peft_config_class(**self.config.peft_config) self.pretrained_model = get_peft_model(self.pretrained_model, peft_config=peft_config) + if self.config.deepspeed_inference: + LOGGER.info("\t+ Using DeepSpeed Inference") + from deepspeed import init_inference + + self.pretrained_model = init_inference( + self.pretrained_model, config=self.config.deepspeed_inference_config + ) + def load_model_from_pretrained(self) -> None: # iniline quantization or quantization config modification if self.config.quantization_scheme == "gptq": @@ -144,29 +154,22 @@ def load_model_from_pretrained(self) -> None: **self.automodel_kwargs, **self.hub_kwargs, ) - elif hasattr(self.pretrained_config, "quantization_config") or self.quantization_config is not None: - LOGGER.info("\t+ Loading model with low cpu memory usage") + else: + LOGGER.info(f"\t+ Loading model and moving it to device: {self.device}") self.pretrained_model = self.automodel_class.from_pretrained( self.model, - low_cpu_memory_usage=True, torch_dtype=self.torch_dtype, **self.automodel_kwargs, **self.hub_kwargs, ).to(self.device) - else: - LOGGER.info(f"\t+ Loading model directly on device: {self.device}") - with self.device: - self.pretrained_model = self.automodel_class.from_pretrained( - self.model, - torch_dtype=self.torch_dtype, - **self.automodel_kwargs, - **self.hub_kwargs, - ) @property def automodel_kwargs(self) -> Dict[str, Any]: kwargs = {} + if hasattr(self.pretrained_config, "quantization_config") or self.quantization_config is not None: + kwargs["low_cpu_memory_usage"] = True + if self.quantization_config is not None: kwargs["quantization_config"] = self.quantization_config @@ -247,18 +250,11 @@ def prepare_for_inference(self, input_shapes: Dict[str, int], **kwargs) -> None: and self.pretrained_config.quantization_config.desc_act ): LOGGER.info("\t+ Setting GPTQ's max_input_length") - from auto_gptq import exllama_set_max_input_length + from auto_gptq import exllama_set_max_input_length # type: ignore max_input_length = to_pow2(input_shapes["batch_size"] * input_shapes["sequence_length"]) self.pretrained_model = exllama_set_max_input_length(self.pretrained_model, max_input_length) - def prepare_for_profiling(self, input_names: List[str]) -> None: - LOGGER.info("Preparing model for profiling") - LOGGER.info("\t+ Symbolicly tracing model") - self.pretrained_model = symbolic_trace(self.pretrained_model, input_names=input_names) - LOGGER.info("\t+ Wrapping model with FXProfilingWrapper") - self.pretrained_model = FXProfilingWrapper(self.pretrained_model) - def forward(self, input: Dict[str, Any], kwargs: Dict[str, Any]) -> "ModelOutput": if self.is_diffusion_pipeline(): return super().forward(input, kwargs) @@ -277,50 +273,38 @@ def generate(self, input: Dict[str, Any], kwargs: Dict[str, Any]) -> "ModelOutpu else: return super().generate(input, kwargs) - @record_if_available def train( self, training_dataset: "Dataset", training_arguments: Dict[str, Any], training_callbacks: List["TrainerCallback"], training_data_collator: Callable, + dataset_format: str = "torch", ) -> "TrainerState": from transformers import Trainer, TrainingArguments - worker_args = ( - "torch", - LOGGER, - Trainer, - TrainingArguments, - self.config.use_ddp, - training_dataset, - training_arguments, - training_data_collator, - training_callbacks, - self.pretrained_model, + LOGGER.info(f"\t+ Setting dataset format to `{dataset_format}`.") + training_dataset.set_format(type=dataset_format, columns=list(training_dataset.features.keys())) + LOGGER.info("\t+ Wrapping training arguments with transformers.TrainingArguments") + training_arguments = TrainingArguments(**training_arguments) + LOGGER.info("\t+ Wrapping model with transformers.Trainer") + trainer = Trainer( + model=self.pretrained_model, + args=training_arguments, + callbacks=training_callbacks, + train_dataset=training_dataset, + data_collator=training_data_collator, ) - if self.config.use_ddp: - from torch.distributed.launcher.api import LaunchConfig, elastic_launch + LOGGER.info("\t+ Starting training") + trainer.train() + LOGGER.info("\t+ Training finished successfully") - # For DDP, we log only the state of the first rank as transformers does. - # since the batch size used in measuring the throughput is the one of world size. - ddp_config = LaunchConfig(**self.config.ddp_config) - results = elastic_launch(config=ddp_config, entrypoint=training_worker)(worker_args)[0] - else: - # For DP, we can still use training_worker, simply not wrapped by the elastic_launch class. - results = training_worker(worker_args) - - return results + return trainer.state def seed(self): super().seed() - if self.device.type == "cuda": - torch.cuda.manual_seed_all(self.config.seed) + torch.manual_seed(self.config.seed) - def clean(self) -> None: - super().clean() - - if self.device.type == "cuda": - torch.cuda.empty_cache() - gc.collect() + if self.device == "cuda": + torch.cuda.manual_seed_all(self.config.seed) diff --git a/optimum_benchmark/backends/pytorch/config.py b/optimum_benchmark/backends/pytorch/config.py index ec7e89fba..c66f9eed4 100644 --- a/optimum_benchmark/backends/pytorch/config.py +++ b/optimum_benchmark/backends/pytorch/config.py @@ -7,7 +7,6 @@ from ...env_utils import is_rocm_system from ...import_utils import torch_version from ..config import BackendConfig -from ..ddp_utils import DDP_CONFIG from ..peft_utils import PEFT_CONFIGS, PEFT_TASKS_TYPES OmegaConf.register_new_resolver("device_count", lambda: len(os.environ.get("CUDA_VISIBLE_DEVICES", "").split(","))) @@ -64,9 +63,9 @@ class PyTorchConfig(BackendConfig): quantization_scheme: Optional[str] = None quantization_config: Dict[str, Any] = field(default_factory=dict) - # training options - use_ddp: bool = False - ddp_config: Dict[str, Any] = field(default_factory=dict) + # distributed options + deepspeed_inference: bool = False + deepspeed_inference_config: Dict[str, Any] = field(default_factory=dict) # peft options peft_strategy: Optional[str] = None @@ -103,15 +102,6 @@ def __post_init__(self): OmegaConf.merge(QUANTIZATION_CONFIG, self.quantization_config) ) - if self.use_ddp: - if CUDA_VISIBLE_DEVICES is None: - raise ValueError("`use_ddp` can only be used when CUDA_VISIBLE_DEVICES is set.") - - self.ddp_config = OmegaConf.to_object(OmegaConf.merge(DDP_CONFIG, self.ddp_config)) - # TODO: check if it's not possible to use DDP with multiple nodes - if self.ddp_config["max_nodes"] > 1 or self.ddp_config["min_nodes"] > 1: - raise NotImplementedError("Currently, PyTorch DDP benchmark only supports training on a single node.") - if self.peft_strategy is not None: if self.peft_strategy not in PEFT_CONFIGS: raise ValueError( diff --git a/optimum_benchmark/backends/utils.py b/optimum_benchmark/backends/utils.py index 8897cd15c..997f317cb 100644 --- a/optimum_benchmark/backends/utils.py +++ b/optimum_benchmark/backends/utils.py @@ -38,7 +38,8 @@ def extract_shapes_from_diffusion_pipeline(pipeline: "Pipeline") -> Dict[str, An def extract_shapes_from_model_artifacts( - config: "PretrainedConfig", processor: Optional["PreTrainedProcessor"] = None + config: "PretrainedConfig", + processor: Optional["PreTrainedProcessor"] = None, ) -> Dict[str, Any]: shapes = {} artifacts_dict = {} diff --git a/optimum_benchmark/benchmarks/base.py b/optimum_benchmark/benchmarks/base.py index 4c33ed787..87ea86571 100644 --- a/optimum_benchmark/benchmarks/base.py +++ b/optimum_benchmark/benchmarks/base.py @@ -33,5 +33,8 @@ def configure(self, config: BenchmarkConfigT) -> None: def run(self, backend: "Backend") -> None: raise NotImplementedError("Benchmark must implement run method") - def save(self) -> None: - raise NotImplementedError("Benchmark must implement save method") + def get_results_df(self) -> None: + raise NotImplementedError("Benchmark must implement get_results_df method") + + def save_to_csv(self) -> None: + raise NotImplementedError("Benchmark must implement save_to_csv method") diff --git a/optimum_benchmark/benchmarks/inference/benchmark.py b/optimum_benchmark/benchmarks/inference/benchmark.py index 7b82efc27..db45dc9db 100644 --- a/optimum_benchmark/benchmarks/inference/benchmark.py +++ b/optimum_benchmark/benchmarks/inference/benchmark.py @@ -285,7 +285,7 @@ def get_results_df(self) -> DataFrame: return DataFrame(results_dict, index=[0]) - def save(self) -> None: - LOGGER.info("Saving inference results") + def save_to_csv(self) -> None: + LOGGER.info("Saving results") results_df = self.get_results_df() results_df.to_csv("inference_results.csv", index=False) diff --git a/optimum_benchmark/benchmarks/inference/config.py b/optimum_benchmark/benchmarks/inference/config.py index 3e4048e63..0da1614e0 100644 --- a/optimum_benchmark/benchmarks/inference/config.py +++ b/optimum_benchmark/benchmarks/inference/config.py @@ -11,10 +11,8 @@ LOGGER = getLogger("inference") -OmegaConf.register_new_resolver("can_generate", lambda task: task in TEXT_GENERATION_TASKS) -OmegaConf.register_new_resolver("can_diffuse", lambda task: task in DIFFUSION_TASKS) - GENERATE_CONFIG = { + "num_return_sequences": 1, "max_new_tokens": 100, "min_new_tokens": 100, "do_sample": False, @@ -27,6 +25,9 @@ "num_images_per_prompt": 1, } +OmegaConf.register_new_resolver("can_generate", lambda task: task in TEXT_GENERATION_TASKS) +OmegaConf.register_new_resolver("can_diffuse", lambda task: task in DIFFUSION_TASKS) + @dataclass class InferenceConfig(BenchmarkConfig): @@ -81,6 +82,8 @@ def __post_init__(self): if self.new_tokens is not None: self.generate_kwargs["max_new_tokens"] = self.new_tokens self.generate_kwargs["min_new_tokens"] = self.new_tokens + else: + self.new_tokens = self.generate_kwargs["min_new_tokens"] if self.energy and os.environ.get("CUDA_VISIBLE_DEVICES", None) and is_rocm_system(): raise ValueError("Energy measurement through codecarbon is not available on RoCm-powered devices.") diff --git a/optimum_benchmark/benchmarks/training/benchmark.py b/optimum_benchmark/benchmarks/training/benchmark.py index 8af586441..11ccf5df0 100644 --- a/optimum_benchmark/benchmarks/training/benchmark.py +++ b/optimum_benchmark/benchmarks/training/benchmark.py @@ -56,7 +56,7 @@ def run(self, backend: "Backend") -> None: def get_results_df(self) -> DataFrame: return DataFrame(self.training_metrics, index=[0]) - def save(self) -> None: + def save_to_csv(self) -> None: LOGGER.info("Saving training results") results_df = self.get_results_df() results_df.to_csv("training_results.csv", index=False) diff --git a/optimum_benchmark/benchmarks/utils.py b/optimum_benchmark/benchmarks/utils.py index 8dffaa60a..2b4984bd2 100644 --- a/optimum_benchmark/benchmarks/utils.py +++ b/optimum_benchmark/benchmarks/utils.py @@ -48,6 +48,7 @@ def on_train_end(self, args: "TrainingArguments", state: "TrainerState", control state.training_end = time.time_ns() * 1e-9 state.overall_training_end = time.time_ns() * 1e-9 + print(args.world_size) state.total_training_batch_size = args.train_batch_size * args.gradient_accumulation_steps * args.world_size # warmup metrics diff --git a/optimum_benchmark/cli.py b/optimum_benchmark/cli.py new file mode 100644 index 000000000..771d71151 --- /dev/null +++ b/optimum_benchmark/cli.py @@ -0,0 +1,66 @@ +import glob +import os +import sys +from logging import getLogger + +import hydra +from omegaconf import DictConfig + +from .experiment import run_with_launcher + +LOGGER = getLogger("main") + + +@hydra.main(version_base=None) +# hydra takes care of the cli and returns the config object +def benchmark_cli(experiment: DictConfig) -> None: + if glob.glob("*.csv") and os.environ.get("OVERRIDE_BENCHMARKS", "0") != "1": + LOGGER.warning( + "Skipping benchmark because results already exist. " + "Set OVERRIDE_BENCHMARKS=1 to override benchmark results." + ) + return + + # Run the benchmark + benchmark = run_with_launcher(experiment) + # Save results + benchmark.save_to_csv() + + +def report_cli() -> None: + action = sys.argv[1] + sys.argv = sys.argv[1:] + + if action == "gather": + from .aggregators.gather import gather_cli + + gather_cli() + elif action == "display": + from .aggregators.display import display_cli + + display_cli() + elif action == "summarize": + from .aggregators.summarize import summarize_cli + + summarize_cli() + elif action == "plot": + from .aggregators.plot import plot_cli + + plot_cli() + elif action in ["-h", "--help"]: + print( + """ + Usage: optimum-report + Actions: + gather + display + summarize + plot + -h, --help + + For more information on each action, run: + optimum-report -h + """ + ) + else: + raise ValueError(f"Unknown action {action}") diff --git a/optimum_benchmark/experiment.py b/optimum_benchmark/experiment.py index 6b99dca7b..39faf58c9 100644 --- a/optimum_benchmark/experiment.py +++ b/optimum_benchmark/experiment.py @@ -1,13 +1,9 @@ -import glob -import logging.config -import multiprocessing import os import platform from dataclasses import dataclass, field from logging import getLogger from typing import TYPE_CHECKING, Any, Dict, Type -import hydra from hydra.core.config_store import ConfigStore from hydra.utils import get_class from omegaconf import DictConfig, OmegaConf @@ -26,11 +22,15 @@ optimum_version, transformers_version, ) +from .launchers.process.config import ProcessConfig +from .launchers.torchrun.config import TorchrunConfig from .task_utils import infer_task_from_model_name_or_path if TYPE_CHECKING: from .backends.base import Backend from .benchmarks.base import Benchmark + from .launchers.base import Launcher + LOGGER = getLogger("experiment") @@ -39,6 +39,9 @@ @dataclass class ExperimentConfig: + # LAUNCHER CONFIGURATION + launcher: Any # https://github.com/facebookresearch/hydra/issues/1722#issuecomment-883568386 + # BACKEND CONFIGURATION backend: Any # https://github.com/facebookresearch/hydra/issues/1722#issuecomment-883568386 @@ -46,17 +49,18 @@ class ExperimentConfig: benchmark: Any # https://github.com/facebookresearch/hydra/issues/1722#issuecomment-883568386 # EXPERIMENT CONFIGURATION - experiment_name: str + experiment_name: str = "optimum-benchmark" # Model name or path (bert-base-uncased, google/vit-base-patch16-224, ...) - model: str - # Device name or path (cpu, cuda, cuda:0, ...) - device: str + model: str = "bert-base-uncased" # Task name (text-classification, image-classification, ...) task: str = "${infer_task:${model}}" + # Device name or path (cpu, cuda, cuda:0, ...) + device: str = "cuda" # ADDITIONAL MODEL CONFIGURATION: Model revision, use_auth_token, trust_remote_code hub_kwargs: Dict = field( default_factory=lambda: { + "token": None, "revision": "main", "cache_dir": None, "force_download": False, @@ -65,7 +69,6 @@ class ExperimentConfig: ) # ENVIRONMENT CONFIGURATION - # TODO: add gpu info when available environment: Dict = field( default_factory=lambda: { "optimum_version": optimum_version(), @@ -86,22 +89,31 @@ class ExperimentConfig: ) def __post_init__(self) -> None: - # if the number of available GPUs is 1, then we have no problem - # torch and nvidia-smi will both index it as 0, otherwise: + if self.device.startswith("cuda:"): + raise ValueError( + f"Device was specified as {self.device} with a target index." + "We recommend using the main cuda device (`cuda`) and specifying the target index in `CUDA_VISIBLE_DEVICES`." + ) + + if self.device not in ["cuda", "cpu", "mps", "xla"]: + raise ValueError("`device` must be either `cuda`, `cpu`, `mps` or `xla`.") + if "cuda" in self.device and len(self.environment["gpus"]) > 1: - CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", None) - if CUDA_VISIBLE_DEVICES is None: - raise ValueError( + if os.environ.get("CUDA_VISIBLE_DEVICES", None) is None: + LOGGER.warning( "Multiple GPUs detected but CUDA_VISIBLE_DEVICES is not set. " - "This means that code might allocate resources from GPUs that are not intended to be used. " - "Please set `CUDA_VISIBLE_DEVICES` to the desired GPU ids." + "This means that code might allocate resources from the wrong GPUs. " + "We recommend setting CUDA_VISIBLE_DEVICES to isolate the GPUs that will be used for this experiment. " + "`CUDA_VISIBLE_DEVICES` will be set to `0` to ensure that only the first GPU is used." + "If you want to use multiple GPUs, please set `CUDA_VISIBLE_DEVICES` to the desired GPU indices." ) - CUDA_DEVICE_ORDER = os.environ.get("CUDA_DEVICE_ORDER", None) - if CUDA_DEVICE_ORDER is None or CUDA_DEVICE_ORDER != "PCI_BUS_ID": + os.environ["CUDA_VISIBLE_DEVICES"] = "0" + + if os.environ.get("CUDA_DEVICE_ORDER", None) != "PCI_BUS_ID": LOGGER.warning( - "Multiple GPUs detected but CUDA_DEVICE_ORDER is not set. " - "This means that code might allocate resources from the wrong GPUs even if CUDA_VISIBLE_DEVICES is set. " - "Pytorch uses the `FASTEST_FIRST` order by default, which is not guaranteed to be the same as nvidia-smi. " + "Multiple GPUs detected but CUDA_DEVICE_ORDER is not set to `PCI_BUS_ID`. " + "This means that code might allocate resources from the wrong GPUs even if `CUDA_VISIBLE_DEVICES` is set. " + "For example pytorch uses the `FASTEST_FIRST` order by default, which is not guaranteed to be the same as nvidia-smi. " "`CUDA_DEVICE_ORDER` will be set to `PCI_BUS_ID` to ensure that the GPUs are allocated in the same order as nvidia-smi. " ) os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" @@ -117,16 +129,13 @@ def __post_init__(self) -> None: cs.store(group="backend", name="text-generation-inference", node=TGIConfig) cs.store(group="benchmark", name="inference", node=InferenceConfig) cs.store(group="benchmark", name="training", node=TrainingConfig) +cs.store(group="launcher", name="process", node=ProcessConfig) +cs.store(group="launcher", name="torchrun", node=TorchrunConfig) -def run(experiment: DictConfig) -> None: - # Configure logging - hydra_conf = OmegaConf.load(".hydra/hydra.yaml") - logging.config.dictConfig(OmegaConf.to_container(hydra_conf.hydra.job_logging, resolve=True)) - - # This is required to trigger __post_init__. Reference: https://github.com/omry/omegaconf/issues/377 +def run(experiment: "ExperimentConfig") -> "Benchmark": + # Instantiate the experiment config to trigger __post_init__ experiment: ExperimentConfig = OmegaConf.to_object(experiment) - # Save the config OmegaConf.save(experiment, "hydra_config.yaml", resolve=True) # Allocate requested backend @@ -161,8 +170,6 @@ def run(experiment: DictConfig) -> None: try: # Run the benchmark benchmark.run(backend) - # Save the benchmark results - benchmark.save() # Clean up the backend backend.clean() except Exception as e: @@ -170,25 +177,23 @@ def run(experiment: DictConfig) -> None: backend.clean() raise e + return benchmark -def run_isolated(experiment: DictConfig, start_method: str = "spawn") -> None: - # Set the multiprocessing start method if not already set - if multiprocessing.get_start_method(allow_none=True) != start_method: - multiprocessing.set_start_method(start_method) - - # Execute the experiment in a child process - p = multiprocessing.Process(target=run, args=(experiment,)) - p.start() - p.join() - # Exit with the same exit code as the child process - exit(p.exitcode) +def run_with_launcher(experiment: DictConfig) -> "Benchmark": + launcher_factory: Type["Launcher"] = get_class(experiment.launcher._target_) + launcher: "Launcher" = launcher_factory() + try: + launcher.configure(experiment.launcher) + except Exception as e: + LOGGER.error("Error during launcher configuration: %s", e) + raise e -@hydra.main(version_base=None) -def main(experiment: DictConfig) -> None: - if glob.glob("*.csv"): - LOGGER.warning("Skipping because results already exist in experiment directory.") - return + try: + benchmark: Benchmark = launcher.launch(run, experiment) + except Exception as e: + LOGGER.error("Error during experiment execution: %s", e) + raise e - run_isolated(experiment, start_method="spawn") + return benchmark diff --git a/optimum_benchmark/launchers/__init__.py b/optimum_benchmark/launchers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/optimum_benchmark/launchers/base.py b/optimum_benchmark/launchers/base.py new file mode 100644 index 000000000..30c13dd3e --- /dev/null +++ b/optimum_benchmark/launchers/base.py @@ -0,0 +1,31 @@ +from abc import ABC +from dataclasses import dataclass +from logging import getLogger +from typing import Callable, ClassVar, Generic, TypeVar + +LOGGER = getLogger("launcher") + + +@dataclass +class LauncherConfig(ABC): + name: str + _target_: str + + +LauncherConfigT = TypeVar("LauncherConfigT", bound=LauncherConfig) + + +class Launcher(Generic[LauncherConfigT], ABC): + NAME: ClassVar[str] + + config: LauncherConfigT + + def __init__(self) -> None: + pass + + def configure(self, config: "LauncherConfigT") -> None: + LOGGER.info(f"Configuring {self.NAME} launcher") + self.config = config + + def launch(self, worker: Callable, **worker_kwargs) -> None: + raise NotImplementedError("Launcher must implement launch method") diff --git a/optimum_benchmark/launchers/process/__init__.py b/optimum_benchmark/launchers/process/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/optimum_benchmark/launchers/process/config.py b/optimum_benchmark/launchers/process/config.py new file mode 100644 index 000000000..332b5dd6c --- /dev/null +++ b/optimum_benchmark/launchers/process/config.py @@ -0,0 +1,18 @@ +from dataclasses import dataclass +from logging import getLogger + +from ..base import LauncherConfig + +LOGGER = getLogger("process") + + +@dataclass +class ProcessConfig(LauncherConfig): + name: str = "process" + _target_: str = "optimum_benchmark.launchers.process.launcher.ProcessLauncher" + + start_method: str = "spawn" + + def __post_init__(self) -> None: + if self.start_method not in ["spawn", "fork"]: + raise ValueError(f"start_method must be one of ['spawn', 'fork'], got {self.start_method}") diff --git a/optimum_benchmark/launchers/process/launcher.py b/optimum_benchmark/launchers/process/launcher.py new file mode 100644 index 000000000..afd421204 --- /dev/null +++ b/optimum_benchmark/launchers/process/launcher.py @@ -0,0 +1,53 @@ +import logging.config +import multiprocessing as mp +from logging import getLogger +from multiprocessing import Process +from typing import Callable + +from omegaconf import OmegaConf + +from ..base import Launcher +from .config import ProcessConfig + +LOGGER = getLogger("process") + + +class ProcessLauncher(Launcher[ProcessConfig]): + NAME = "process" + + def __init__(self) -> None: + super().__init__() + + def configure(self, config: ProcessConfig) -> None: + super().configure(config) + + def launch(self, worker: Callable, *worker_args) -> None: + # Set the multiprocessing start method if not already set + if mp.get_start_method(allow_none=True) is None: + mp.set_start_method(self.config.start_method) + + # Execute in a separate process + p = Process( + target=target, + args=(worker, *worker_args), + daemon=True, + ) + p.start() + benchmark = p.join() + + # Exit with the same exit code as the child process + if p.exitcode != 0: + LOGGER.error(f"Child process exited with code {p.exitcode}") + exit(p.exitcode) + else: + return benchmark + + +def target(fn, *args): + """ + This a pickalable function that correctly sets up the logging configuration + """ + hydra_conf = OmegaConf.load(".hydra/hydra.yaml") + logging.config.dictConfig(OmegaConf.to_container(hydra_conf.hydra.job_logging, resolve=True)) + + return fn(*args) diff --git a/optimum_benchmark/launchers/torchrun/__init__.py b/optimum_benchmark/launchers/torchrun/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/optimum_benchmark/launchers/torchrun/config.py b/optimum_benchmark/launchers/torchrun/config.py new file mode 100644 index 000000000..c139eace7 --- /dev/null +++ b/optimum_benchmark/launchers/torchrun/config.py @@ -0,0 +1,49 @@ +import os +from dataclasses import dataclass, field +from logging import getLogger +from typing import Any, Dict + +from omegaconf import OmegaConf + +from ..base import LauncherConfig + +LOGGER = getLogger("torchrun") + +OmegaConf.register_new_resolver("available_gpus", lambda: len(os.environ.get("CUDA_VISIBLE_DEVICES", "").split(","))) + + +@dataclass +class TorchrunConfig(LauncherConfig): + name: str = "torchrun" + _target_: str = "optimum_benchmark.launchers.torchrun.launcher.TorchrunLauncher" + + min_nodes: int = 1 + max_nodes: int = 1 + nproc_per_node: int = "${available_gpus:}" + run_id: str = "${experiment_name}" + role: str = "benchmark_worker" + monitor_interval: int = 30 + rdzv_endpoint: str = "localhost:29500" + rdzv_backend: str = "static" + rdzv_timeout: int = 900 + rdzv_configs: Dict[str, Any] = field(default_factory=lambda: {"rank": 0, "timeout": 900}) + max_restarts: int = 0 + start_method: str = "spawn" + metrics_cfg: str = "" + redirects: str = "0" + tee: str = "0" + local_addr: str = "" + log_dir: str = "" + + def __post_init__(self) -> None: + if self.start_method not in ["spawn", "fork"]: + raise ValueError(f"start_method must be one of ['spawn', 'fork'], got {self.start_method}") + + if self.min_nodes != self.max_nodes: + raise ValueError( + f"min_nodes and max_nodes must be equal for a reproducible benchmark, got {self.min_nodes} and {self.max_nodes}" + ) + + if self.min_nodes != 1: + LOGGER.info("For multi-node benchmarks, run the benchmark on each node separately.") + LOGGER.info(f"Waiting for the other nodes to be avaialable at {self.rdzv_endpoint}...") diff --git a/optimum_benchmark/launchers/torchrun/launcher.py b/optimum_benchmark/launchers/torchrun/launcher.py new file mode 100644 index 000000000..0025acc1c --- /dev/null +++ b/optimum_benchmark/launchers/torchrun/launcher.py @@ -0,0 +1,69 @@ +import logging.config +import os +from logging import getLogger +from typing import Callable + +from omegaconf import OmegaConf +from torch.distributed.elastic.multiprocessing import Std +from torch.distributed.elastic.multiprocessing.errors import record +from torch.distributed.launcher.api import LaunchConfig, launch_agent + +from ..base import Launcher +from .config import TorchrunConfig + +LOGGER = getLogger("torchrun") + + +class TorchrunLauncher(Launcher[TorchrunConfig]): + NAME = "torchrun" + + def __init__(self) -> None: + super().__init__() + + def configure(self, config: "TorchrunConfig") -> None: + super().configure(config) + + LOGGER.info(f"Running {self.config.nproc_per_node} processes per node") + + def launch(self, worker: Callable, *worker_args) -> None: + launch_config = LaunchConfig( + min_nodes=self.config.min_nodes, + max_nodes=self.config.max_nodes, + nproc_per_node=self.config.nproc_per_node, + run_id=self.config.run_id, + role=self.config.role, + monitor_interval=self.config.monitor_interval, + rdzv_endpoint=self.config.rdzv_endpoint, + rdzv_backend=self.config.rdzv_backend, + rdzv_configs=self.config.rdzv_configs, + max_restarts=self.config.max_restarts, + start_method=self.config.start_method, + metrics_cfg=self.config.metrics_cfg, + redirects=Std.from_str(self.config.redirects), + tee=Std.from_str(self.config.tee), + local_addr=self.config.local_addr, + log_dir=self.config.log_dir, + ) + + benchmarks = launch_agent( + entrypoint=entrypoint, + args=(worker, *worker_args), + config=launch_config, + ) + + return benchmarks[0] + + +@record +def entrypoint(fn, *args): + """ + This a pickalable function that correctly sets up the logging configuration + """ + + if os.environ.get("LOCAL_RANK", None) == "0": + hydra_conf = OmegaConf.load(".hydra/hydra.yaml") + logging.config.dictConfig(OmegaConf.to_container(hydra_conf.hydra.job_logging, resolve=True)) + else: + logging.disable(logging.CRITICAL) + + return fn(*args) diff --git a/optimum_benchmark/report.py b/optimum_benchmark/report.py deleted file mode 100644 index 26c2b9f8e..000000000 --- a/optimum_benchmark/report.py +++ /dev/null @@ -1,40 +0,0 @@ -import sys - -HELP = """ -Usage: optimum-report -Actions: - gather - display - summarize - plot - -h, --help - -For more information on each action, run: - optimum-report -h -""" - - -def main(): - action = sys.argv[1] - sys.argv = sys.argv[1:] - - if action == "gather": - from .aggregators.gather import gather_cli - - gather_cli() - elif action == "display": - from .aggregators.display import display_cli - - display_cli() - elif action == "summarize": - from .aggregators.summarize import summarize_cli - - summarize_cli() - elif action == "plot": - from .aggregators.plot import plot_cli - - plot_cli() - elif action in ["-h", "--help"]: - print(HELP) - else: - raise ValueError(f"Unknown action {action}") diff --git a/optimum_benchmark/trackers/energy.py b/optimum_benchmark/trackers/energy.py index 052809a7d..cca1a8f29 100644 --- a/optimum_benchmark/trackers/energy.py +++ b/optimum_benchmark/trackers/energy.py @@ -7,7 +7,7 @@ if is_codecarbon_available(): from codecarbon import EmissionsTracker, OfflineEmissionsTracker -LOGGER = getLogger("latency_tracker") +LOGGER = getLogger("energy") class EnergyTracker: @@ -30,9 +30,12 @@ def track(self, interval=1, file_prefix=""): LOGGER.info("Falling back to Offline Emissions Tracker") country_iso_code = os.environ.get("COUNTRY_ISO_CODE", None) if country_iso_code is None: - raise ValueError( - "COUNTRY_ISO_CODE environment variable must be set when using Offline Emissions Tracker" + LOGGER.warning( + "Offline Emissions Tracker requires COUNTRY_ISO_CODE to be set. " + "We will set it to FRA but the carbon footprint will be inaccurate." ) + country_iso_code = "FRA" + self.emission_tracker = OfflineEmissionsTracker( log_level="error", tracking_mode="process", diff --git a/optimum_benchmark/trackers/latency.py b/optimum_benchmark/trackers/latency.py index d34be65f7..e63d5d0d0 100644 --- a/optimum_benchmark/trackers/latency.py +++ b/optimum_benchmark/trackers/latency.py @@ -3,25 +3,18 @@ from logging import getLogger from typing import List -import torch - -LOGGER = getLogger("latency_tracker") +LOGGER = getLogger("latency") class LatencyTracker: - def __init__(self, device: torch.device, backend: str): + def __init__(self, device: str, backend: str): self.device = device self.backend = backend self.latencies: List[float] = [] - if self.device.type == "cuda" and self.backend == "pytorch": - # because pytorch will always see devices as 0, 1, 2, ... CUDA_VISIBLE_DEVICES doesn't matter - self.device_ids = list(range(torch.cuda.device_count())) - LOGGER.info(f"Tracking Pytorch CUDA devices: {self.device_ids}") - @contextmanager def track(self): - if self.device.type == "cuda" and self.backend == "pytorch": + if self.device == "cuda" and self.backend == "pytorch": yield from self._cuda_latency() else: yield from self._cpu_latency() @@ -29,32 +22,32 @@ def track(self): def get_latencies(self): return self.latencies + def _cpu_latency(self): + start = time.perf_counter_ns() + yield + end = time.perf_counter_ns() + latency_ns = end - start + latency = latency_ns / 1e9 + + LOGGER.debug(f"Tracked CPU latency: {latency:.2e}s") + self.latencies.append(latency) + def _cuda_latency(self): + import torch.cuda + start_event = torch.cuda.Event(enable_timing=True) end_event = torch.cuda.Event(enable_timing=True) - for device_index in self.device_ids: - torch.cuda.synchronize(device=device_index) - # here must record the start event after the synchronization of all devices - start_event.record(stream=torch.cuda.current_stream(device=self.device_ids[-1])) + + torch.cuda.synchronize() + start_event.record() + torch.cuda.synchronize() yield - for device_index in self.device_ids: - if device_index == self.device_ids[-1]: - # here we must record the end event before the synchronization of the last device - end_event.record(stream=torch.cuda.current_stream(device=self.device_ids[-1])) - torch.cuda.synchronize(device=device_index) + torch.cuda.synchronize() + end_event.record() + torch.cuda.synchronize() latency_ms = start_event.elapsed_time(end_event) latency = latency_ms / 1e3 LOGGER.debug(f"Tracked CUDA latency: {latency:.2e}s") self.latencies.append(latency) - - def _cpu_latency(self): - start = time.perf_counter_ns() - yield - end = time.perf_counter_ns() - latency_ns = end - start - latency = latency_ns / 1e9 - - LOGGER.debug(f"Tracked CPU latency: {latency:.2e}s") - self.latencies.append(latency) diff --git a/optimum_benchmark/trackers/memory.py b/optimum_benchmark/trackers/memory.py index 0ad7fac2b..97d45972b 100644 --- a/optimum_benchmark/trackers/memory.py +++ b/optimum_benchmark/trackers/memory.py @@ -10,7 +10,7 @@ from ..env_utils import bytes_to_mega_bytes, is_nvidia_system, is_rocm_system from ..import_utils import is_py3nvml_available, is_pyrsmi_available -LOGGER = getLogger("memory_tracker") +LOGGER = getLogger("memory") class MemoryTracker: diff --git a/setup.py b/setup.py index 9ad637536..21a5998cc 100644 --- a/setup.py +++ b/setup.py @@ -10,9 +10,9 @@ f"optimum>={OPTIMUM_VERSION}", # backends, tasks and input generation "accelerate", # distributed inference and no weights init # Hydra - "omegaconf", - "hydra-core", "hydra_colorlog", + "hydra-core", + "omegaconf", # Other "psutil", "pandas", @@ -53,9 +53,10 @@ # gpu backends "onnxruntime-gpu": [f"optimum[onnxruntime-gpu]>={OPTIMUM_VERSION}"], "onnxruntime-training": ["torch-ort", "onnxruntime-training"], - # server-like backends - "text-generation-inference": ["docker>=6.0.0"], + # docker-based backends + "text-generation-inference": ["docker"], # specific settings + "deepspeed": ["deepspeed"], "diffusers": ["diffusers"], "peft": ["peft"], } @@ -69,8 +70,8 @@ extras_require=EXTRAS_REQUIRE, entry_points={ "console_scripts": [ - "optimum-benchmark=optimum_benchmark.experiment:main", - "optimum-report=optimum_benchmark.report:main", + "optimum-benchmark=optimum_benchmark.cli:benchmark_cli", + "optimum-report=optimum_benchmark.cli:report_cli", ] }, ) diff --git a/tests/configs/_base_.yaml b/tests/configs/_base_.yaml index af2811c68..d989ee01f 100644 --- a/tests/configs/_base_.yaml +++ b/tests/configs/_base_.yaml @@ -16,13 +16,12 @@ hydra: # this is useful for saving outputs in a separate directory chdir: true env_set: - # by default, we only use one GPU - CUDA_VISIBLE_DEVICES: 0 - CUDA_DEVICE_ORDER: PCI_BUS_ID + CUDA_VISIBLE_DEVICES: 0 # by default we only use one GPU + CUDA_DEVICE_ORDER: PCI_BUS_ID # laking we use the right GPU + OVERRIDE_BENCHMARKS: 1 # to not skip benchmarks backend: # we turn off isolation checks because tests run on shared resources - initial_isolation_check: false - continous_isolation_check: false + continuous_isolation: false experiment_name: ${device}_${backend.name}_${benchmark.name}_${task} diff --git a/tests/configs/_ddp_.yaml b/tests/configs/_ddp_.yaml index ddfbf8b70..591554b01 100644 --- a/tests/configs/_ddp_.yaml +++ b/tests/configs/_ddp_.yaml @@ -1,19 +1,20 @@ # Distributed Data Parallel (DDP) training +defaults: + - launcher: torchrun + +hydra: + job: + env_set: + CUDA_VISIBLE_DEVICES: 0,1 + experiment_name: ${device}_${backend.name}_${benchmark.name}_${task}_ddp +launcher: + nproc_per_node: 2 + benchmark: dataset_shapes: dataset_size: 1600 sequence_length: 256 training_arguments: per_device_train_batch_size: 8 - -backend: - use_ddp: true - ddp_config: - rdzv_endpoint: 127.0.0.1:29509 - -hydra: - job: - env_set: - CUDA_VISIBLE_DEVICES: 0,1 diff --git a/tests/configs/_dp_.yaml b/tests/configs/_dp_.yaml index 9cb390748..848748268 100644 --- a/tests/configs/_dp_.yaml +++ b/tests/configs/_dp_.yaml @@ -1,4 +1,9 @@ # Data Parallel (DP) training +hydra: + job: + env_set: + CUDA_VISIBLE_DEVICES: 0,1 + experiment_name: ${device}_${backend.name}_${benchmark.name}_${task}_dp benchmark: @@ -7,8 +12,3 @@ benchmark: sequence_length: 256 training_arguments: per_device_train_batch_size: 8 - -hydra: - job: - env_set: - CUDA_VISIBLE_DEVICES: 0,1 diff --git a/tests/configs/_pp_.yaml b/tests/configs/_pp_.yaml index 4bb900c33..8e15d0010 100644 --- a/tests/configs/_pp_.yaml +++ b/tests/configs/_pp_.yaml @@ -1,10 +1,10 @@ # Pipeline Parallelism (PP) -experiment_name: ${device}_${backend.name}_${benchmark.name}_${task}_mp - -backend: - device_map: auto - hydra: job: env_set: CUDA_VISIBLE_DEVICES: 0,1 + +experiment_name: ${device}_${backend.name}_${benchmark.name}_${task}_mp + +backend: + device_map: auto diff --git a/tests/configs/_tp_.yaml b/tests/configs/_tp_.yaml new file mode 100644 index 000000000..928d5bd10 --- /dev/null +++ b/tests/configs/_tp_.yaml @@ -0,0 +1,19 @@ +# Tensor Parallelism (TP) +defaults: + - launcher: torchrun # distributed runner + +hydra: + job: + env_set: + CUDA_VISIBLE_DEVICES: 0,1 + +experiment_name: ${device}_${backend.name}_${benchmark.name}_${task}_tp + +launcher: + nproc_per_node: 2 + +backend: + deepspeed_inference: true + deepspeed_inference_config: + tensor_parallel: + tp_size: 2 diff --git a/tests/configs/cuda_pytorch_inference_bert_tp.yaml b/tests/configs/cuda_pytorch_inference_bert_tp.yaml new file mode 100644 index 000000000..a59aa36b3 --- /dev/null +++ b/tests/configs/cuda_pytorch_inference_bert_tp.yaml @@ -0,0 +1,10 @@ +defaults: + - benchmark: inference + - backend: pytorch + - _base_ # inherits from base config + - _tp_ # inherits from tensor parallelism config + - _self_ # for hydra 1.1 compatibility + +model: hf-internal-testing/tiny-random-bert +task: text-classification +device: cuda diff --git a/tests/test_cli.py b/tests/test_cli.py index 4bce083e9..755a7abc2 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -21,7 +21,7 @@ @pytest.mark.parametrize("config_file", SINGLERUNS) -def test_configs(config_file): +def test_single_run(config_file): config_name = config_file.split(".")[0] result = subprocess.run(