From 1f13d1927676b6d5d18ba34d20690f052918be40 Mon Sep 17 00:00:00 2001 From: fandaoyi Date: Sun, 24 Mar 2024 10:32:40 +0000 Subject: [PATCH 1/6] add batch wrap --- dipu/torch_dipu/dipu/distributed.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/dipu/torch_dipu/dipu/distributed.py b/dipu/torch_dipu/dipu/distributed.py index 8b1cfd93c..9eeb2a86b 100644 --- a/dipu/torch_dipu/dipu/distributed.py +++ b/dipu/torch_dipu/dipu/distributed.py @@ -39,6 +39,7 @@ def reg_dicl(store, rank, size, timeout): def _wrapped_register_backend(self, device, backend_type, backend): # dicl not support cpu tensor if device.type == "cpu" and isinstance(backend, ProcessGroupDICL): + print(" try create a gloo backend,", backend.size(), flush=True) backend = ProcessGroupGloo( backend.store(), backend.rank(), backend.size(), timeout=backend.timeout() ) @@ -85,8 +86,31 @@ def _wrap_get_backend(group: Optional[ProcessGroup] = None) -> str: else: return ret +# dicl not support coalescing now. Todo: remove after support +def wrap_batch_isend_irecv(p2p_op_list): + dist.distributed_c10d._check_p2p_op_list(p2p_op_list) + reqs = [] + for p2p_op in p2p_op_list: + work = p2p_op.op(p2p_op.tensor, p2p_op.peer, p2p_op.group, p2p_op.tag) + if work: + reqs.append(work) + return reqs + +# huawei AscendSpeed pass rank list like [0, 0], which cause pg +# creation fail in torch 2.0. actually it's huawei's problem, such input +# is not valid, but nothing else we can do. +# torch 2.1 handle duplication rank inner +_raw_new_group = dist.new_group +def wrap_new_group(ranks=None, timeout=default_pg_timeout, backend=None, pg_options=None): + ranks = list(set(ranks)) # dedup + return _raw_new_group(ranks, timeout, backend, pg_options) + def apply_dist_patch(): dist.get_backend = _wrap_get_backend dist.init_process_group = _wrap_init_process_groups dist.ProcessGroup._register_backend = _wrapped_register_backend + dist.batch_isend_irecv = wrap_batch_isend_irecv + + if dipu.get_dipu_torch_version() == dipu.torch_ver_200: + dist.new_group = wrap_new_group From 8b53d2681e37a6dffcf481b664623c8c8a4bcb0f Mon Sep 17 00:00:00 2001 From: fandaoyi Date: Sun, 24 Mar 2024 20:00:32 +0800 Subject: [PATCH 2/6] wrap tensor for ascend speed --- .../python/individual_scripts/test_rt_ddp.py | 20 +++++++++++++++++++ .../individual_scripts/test_rt_tensor.py | 3 +++ dipu/torch_dipu/dipu/distributed.py | 6 +++--- dipu/torch_dipu/dipu/tensor.py | 13 +++++++++++- 4 files changed, 38 insertions(+), 4 deletions(-) diff --git a/dipu/tests/python/individual_scripts/test_rt_ddp.py b/dipu/tests/python/individual_scripts/test_rt_ddp.py index d1d1a6cfb..23c0de3ce 100644 --- a/dipu/tests/python/individual_scripts/test_rt_ddp.py +++ b/dipu/tests/python/individual_scripts/test_rt_ddp.py @@ -288,6 +288,24 @@ def demo_allgather_gloo(rank, world_size, port): dist.barrier() cleanup() +def test_special_group_stuck(rank, world_size): + import torch_dipu + + print(f"test special group stuck on rank {rank} ") + + setup(rank, world_size) + + # ranks check require len(ranks) <= world_size + if world_size >= 2: + # torch 2.0 gloo pg has such a limitition. pass in duplicated rank will stuck. + # but huawei do. + ranks_dup = [rank, rank] + group = torch.distributed.new_group(ranks_dup) + print(group) + dist.destroy_process_group(group) + + cleanup() + if __name__ == "__main__": n_gpus = torch.cuda.device_count() @@ -311,3 +329,5 @@ def demo_allgather_gloo(rank, world_size, port): # run_demo(demo_bcast, world_size, port) # run_demo(demo_model_parallel, world_size) + + # run_demo(test_special_group_stuck, world_size) diff --git a/dipu/tests/python/individual_scripts/test_rt_tensor.py b/dipu/tests/python/individual_scripts/test_rt_tensor.py index f2757f759..dd7c8ca04 100644 --- a/dipu/tests/python/individual_scripts/test_rt_tensor.py +++ b/dipu/tests/python/individual_scripts/test_rt_tensor.py @@ -152,6 +152,9 @@ def test_type(): res = isinstance(s4, torch.cuda.FloatTensor) assert res == True + assert dev1 in s1.type() + assert s1.device.type == dev1 + def test_device_copy(): import torch_dipu diff --git a/dipu/torch_dipu/dipu/distributed.py b/dipu/torch_dipu/dipu/distributed.py index 9eeb2a86b..4d633a62c 100644 --- a/dipu/torch_dipu/dipu/distributed.py +++ b/dipu/torch_dipu/dipu/distributed.py @@ -86,7 +86,7 @@ def _wrap_get_backend(group: Optional[ProcessGroup] = None) -> str: else: return ret -# dicl not support coalescing now. Todo: remove after support +# dicl not support coalescing now. Todo: remove after support coalesce def wrap_batch_isend_irecv(p2p_op_list): dist.distributed_c10d._check_p2p_op_list(p2p_op_list) reqs = [] @@ -97,9 +97,9 @@ def wrap_batch_isend_irecv(p2p_op_list): return reqs # huawei AscendSpeed pass rank list like [0, 0], which cause pg -# creation fail in torch 2.0. actually it's huawei's problem, such input +# creation fail in torch 2.0. actually it's huawei's problem, such list # is not valid, but nothing else we can do. -# torch 2.1 handle duplication rank inner +# torch 2.1 handle duplication rank inner. _raw_new_group = dist.new_group def wrap_new_group(ranks=None, timeout=default_pg_timeout, backend=None, pg_options=None): ranks = list(set(ranks)) # dedup diff --git a/dipu/torch_dipu/dipu/tensor.py b/dipu/torch_dipu/dipu/tensor.py index aae8667ac..306c93fee 100644 --- a/dipu/torch_dipu/dipu/tensor.py +++ b/dipu/torch_dipu/dipu/tensor.py @@ -1,7 +1,7 @@ # Copyright (c) 2023, DeepLink. import torch -from .device import __diputype__ +from .device import __diputype__, __dipu_device_type__ from torch_dipu import _C, mockcuda @@ -16,8 +16,19 @@ def __set_default_tensor_type(type=torch.FloatTensor): _default_tensor_type = type +_raw_tensor_type = torch.Tensor.type + +def wrap_tensor_type(self, *args, **kwargs): + ret = _raw_tensor_type(self, *args, **kwargs) + if isinstance(ret, str): + return ret.replace(__dipu_device_type__, "cuda") + else: + return ret + + # need enhance, seems change tensor define is need def apply_tensor_type_patch(): torch.set_default_tensor_type = __set_default_tensor_type if mockcuda: _C._mockCudaTensor() + torch.Tensor.type = wrap_tensor_type From ea7bbfd732912e10955a8fd1b298e1bc04278ff1 Mon Sep 17 00:00:00 2001 From: fandaoyi Date: Sun, 24 Mar 2024 20:01:52 +0800 Subject: [PATCH 3/6] format --- .../python/individual_scripts/test_rt_ddp.py | 33 ++++++++++--------- .../individual_scripts/test_rt_tensor.py | 2 +- dipu/torch_dipu/dipu/distributed.py | 11 +++++-- dipu/torch_dipu/dipu/tensor.py | 11 ++++--- 4 files changed, 32 insertions(+), 25 deletions(-) diff --git a/dipu/tests/python/individual_scripts/test_rt_ddp.py b/dipu/tests/python/individual_scripts/test_rt_ddp.py index 23c0de3ce..d5c7d83fa 100644 --- a/dipu/tests/python/individual_scripts/test_rt_ddp.py +++ b/dipu/tests/python/individual_scripts/test_rt_ddp.py @@ -288,23 +288,24 @@ def demo_allgather_gloo(rank, world_size, port): dist.barrier() cleanup() + def test_special_group_stuck(rank, world_size): - import torch_dipu - - print(f"test special group stuck on rank {rank} ") - - setup(rank, world_size) - - # ranks check require len(ranks) <= world_size - if world_size >= 2: - # torch 2.0 gloo pg has such a limitition. pass in duplicated rank will stuck. - # but huawei do. - ranks_dup = [rank, rank] - group = torch.distributed.new_group(ranks_dup) - print(group) - dist.destroy_process_group(group) - - cleanup() + import torch_dipu + + print(f"test special group stuck on rank {rank} ") + + setup(rank, world_size) + + # ranks check require len(ranks) <= world_size + if world_size >= 2: + # torch 2.0 gloo pg has such a limitition. pass in duplicated rank will stuck. + # but huawei do. + ranks_dup = [rank, rank] + group = torch.distributed.new_group(ranks_dup) + print(group) + dist.destroy_process_group(group) + + cleanup() if __name__ == "__main__": diff --git a/dipu/tests/python/individual_scripts/test_rt_tensor.py b/dipu/tests/python/individual_scripts/test_rt_tensor.py index dd7c8ca04..042c584a8 100644 --- a/dipu/tests/python/individual_scripts/test_rt_tensor.py +++ b/dipu/tests/python/individual_scripts/test_rt_tensor.py @@ -154,7 +154,7 @@ def test_type(): assert dev1 in s1.type() assert s1.device.type == dev1 - + def test_device_copy(): import torch_dipu diff --git a/dipu/torch_dipu/dipu/distributed.py b/dipu/torch_dipu/dipu/distributed.py index 4d633a62c..e77bef8bd 100644 --- a/dipu/torch_dipu/dipu/distributed.py +++ b/dipu/torch_dipu/dipu/distributed.py @@ -39,7 +39,6 @@ def reg_dicl(store, rank, size, timeout): def _wrapped_register_backend(self, device, backend_type, backend): # dicl not support cpu tensor if device.type == "cpu" and isinstance(backend, ProcessGroupDICL): - print(" try create a gloo backend,", backend.size(), flush=True) backend = ProcessGroupGloo( backend.store(), backend.rank(), backend.size(), timeout=backend.timeout() ) @@ -86,6 +85,7 @@ def _wrap_get_backend(group: Optional[ProcessGroup] = None) -> str: else: return ret + # dicl not support coalescing now. Todo: remove after support coalesce def wrap_batch_isend_irecv(p2p_op_list): dist.distributed_c10d._check_p2p_op_list(p2p_op_list) @@ -96,12 +96,17 @@ def wrap_batch_isend_irecv(p2p_op_list): reqs.append(work) return reqs -# huawei AscendSpeed pass rank list like [0, 0], which cause pg + +# huawei AscendSpeed pass rank list like [0, 0], which cause pg # creation fail in torch 2.0. actually it's huawei's problem, such list # is not valid, but nothing else we can do. # torch 2.1 handle duplication rank inner. _raw_new_group = dist.new_group -def wrap_new_group(ranks=None, timeout=default_pg_timeout, backend=None, pg_options=None): + + +def wrap_new_group( + ranks=None, timeout=default_pg_timeout, backend=None, pg_options=None +): ranks = list(set(ranks)) # dedup return _raw_new_group(ranks, timeout, backend, pg_options) diff --git a/dipu/torch_dipu/dipu/tensor.py b/dipu/torch_dipu/dipu/tensor.py index 306c93fee..c11905366 100644 --- a/dipu/torch_dipu/dipu/tensor.py +++ b/dipu/torch_dipu/dipu/tensor.py @@ -18,12 +18,13 @@ def __set_default_tensor_type(type=torch.FloatTensor): _raw_tensor_type = torch.Tensor.type + def wrap_tensor_type(self, *args, **kwargs): - ret = _raw_tensor_type(self, *args, **kwargs) - if isinstance(ret, str): - return ret.replace(__dipu_device_type__, "cuda") - else: - return ret + ret = _raw_tensor_type(self, *args, **kwargs) + if isinstance(ret, str): + return ret.replace(__dipu_device_type__, "cuda") + else: + return ret # need enhance, seems change tensor define is need From 3a662bcab2858d1d5056ee7cb21f822ba7c9fef2 Mon Sep 17 00:00:00 2001 From: fandaoyi Date: Sun, 24 Mar 2024 20:19:16 +0800 Subject: [PATCH 4/6] rename warp --- dipu/torch_dipu/dipu/distributed.py | 8 ++++---- dipu/torch_dipu/dipu/tensor.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dipu/torch_dipu/dipu/distributed.py b/dipu/torch_dipu/dipu/distributed.py index e77bef8bd..d47ff7862 100644 --- a/dipu/torch_dipu/dipu/distributed.py +++ b/dipu/torch_dipu/dipu/distributed.py @@ -87,7 +87,7 @@ def _wrap_get_backend(group: Optional[ProcessGroup] = None) -> str: # dicl not support coalescing now. Todo: remove after support coalesce -def wrap_batch_isend_irecv(p2p_op_list): +def _wrap_batch_isend_irecv(p2p_op_list): dist.distributed_c10d._check_p2p_op_list(p2p_op_list) reqs = [] for p2p_op in p2p_op_list: @@ -104,7 +104,7 @@ def wrap_batch_isend_irecv(p2p_op_list): _raw_new_group = dist.new_group -def wrap_new_group( +def _wrap_new_group( ranks=None, timeout=default_pg_timeout, backend=None, pg_options=None ): ranks = list(set(ranks)) # dedup @@ -115,7 +115,7 @@ def apply_dist_patch(): dist.get_backend = _wrap_get_backend dist.init_process_group = _wrap_init_process_groups dist.ProcessGroup._register_backend = _wrapped_register_backend - dist.batch_isend_irecv = wrap_batch_isend_irecv + dist.batch_isend_irecv = _wrap_batch_isend_irecv if dipu.get_dipu_torch_version() == dipu.torch_ver_200: - dist.new_group = wrap_new_group + dist.new_group = _wrap_new_group diff --git a/dipu/torch_dipu/dipu/tensor.py b/dipu/torch_dipu/dipu/tensor.py index c11905366..6e151f4e4 100644 --- a/dipu/torch_dipu/dipu/tensor.py +++ b/dipu/torch_dipu/dipu/tensor.py @@ -19,7 +19,7 @@ def __set_default_tensor_type(type=torch.FloatTensor): _raw_tensor_type = torch.Tensor.type -def wrap_tensor_type(self, *args, **kwargs): +def _wrap_tensor_type(self, *args, **kwargs): ret = _raw_tensor_type(self, *args, **kwargs) if isinstance(ret, str): return ret.replace(__dipu_device_type__, "cuda") @@ -32,4 +32,4 @@ def apply_tensor_type_patch(): torch.set_default_tensor_type = __set_default_tensor_type if mockcuda: _C._mockCudaTensor() - torch.Tensor.type = wrap_tensor_type + torch.Tensor.type = _wrap_tensor_type From fb0875f9b663e258392abba279ee7361bd7e4688 Mon Sep 17 00:00:00 2001 From: fandaoyi Date: Sun, 24 Mar 2024 13:03:44 +0000 Subject: [PATCH 5/6] add commment --- dipu/torch_dipu/dipu/distributed.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dipu/torch_dipu/dipu/distributed.py b/dipu/torch_dipu/dipu/distributed.py index d47ff7862..0eae1a38d 100644 --- a/dipu/torch_dipu/dipu/distributed.py +++ b/dipu/torch_dipu/dipu/distributed.py @@ -97,10 +97,11 @@ def _wrap_batch_isend_irecv(p2p_op_list): return reqs -# huawei AscendSpeed pass rank list like [0, 0], which cause pg +# huawei AscendSpeed pass rank list like [0, 0], which cause gloo pg # creation fail in torch 2.0. actually it's huawei's problem, such list # is not valid, but nothing else we can do. -# torch 2.1 handle duplication rank inner. +# torch 2.1 not create gloo sub-device-pg when create dicl pg and no stuck happen on pg creation. +# so we keep it's behavior. but even created. it still stuck when try to do any real comm. _raw_new_group = dist.new_group From 07dcb0fc69ddab60d399710df194fec4d69e528d Mon Sep 17 00:00:00 2001 From: fandaoyi Date: Mon, 25 Mar 2024 06:03:01 +0000 Subject: [PATCH 6/6] fix comment --- dipu/tests/python/individual_scripts/test_rt_ddp.py | 4 +--- dipu/torch_dipu/dipu/distributed.py | 7 +++++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/dipu/tests/python/individual_scripts/test_rt_ddp.py b/dipu/tests/python/individual_scripts/test_rt_ddp.py index d5c7d83fa..1fed653da 100644 --- a/dipu/tests/python/individual_scripts/test_rt_ddp.py +++ b/dipu/tests/python/individual_scripts/test_rt_ddp.py @@ -310,9 +310,7 @@ def test_special_group_stuck(rank, world_size): if __name__ == "__main__": n_gpus = torch.cuda.device_count() - # world_size = 1 - # demo_allreduce(0, world_size) - # demo_basic_ddp(0, world_size) + port = random.randint(10000, 60000) world_size = 1 diff --git a/dipu/torch_dipu/dipu/distributed.py b/dipu/torch_dipu/dipu/distributed.py index 0eae1a38d..73e0cfbac 100644 --- a/dipu/torch_dipu/dipu/distributed.py +++ b/dipu/torch_dipu/dipu/distributed.py @@ -86,7 +86,8 @@ def _wrap_get_backend(group: Optional[ProcessGroup] = None) -> str: return ret -# dicl not support coalescing now. Todo: remove after support coalesce +# dicl not support coalescing now. so torch2.1 batch_isend_irecv crash. +# Todo: remove after support coalesce. def _wrap_batch_isend_irecv(p2p_op_list): dist.distributed_c10d._check_p2p_op_list(p2p_op_list) reqs = [] @@ -116,7 +117,9 @@ def apply_dist_patch(): dist.get_backend = _wrap_get_backend dist.init_process_group = _wrap_init_process_groups dist.ProcessGroup._register_backend = _wrapped_register_backend - dist.batch_isend_irecv = _wrap_batch_isend_irecv + # rm batch_isend_irecv after coalse ready + if dipu.get_dipu_torch_version() != dipu.torch_ver_200: + dist.batch_isend_irecv = _wrap_batch_isend_irecv if dipu.get_dipu_torch_version() == dipu.torch_ver_200: dist.new_group = _wrap_new_group