From a2091d89c06f24027c68683ebc27bbf7fd7ccef7 Mon Sep 17 00:00:00 2001 From: panxuchen Date: Tue, 7 May 2024 20:23:58 +0800 Subject: [PATCH] add ray test in workflow --- .github/workflows/unittest.yml | 5 ++++ data_juicer/core/ray_executor.py | 39 +++++++++++++---------------- data_juicer/utils/unittest_utils.py | 18 +++++++------ tests/run.py | 4 +-- 4 files changed, 35 insertions(+), 31 deletions(-) diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml index ed1a10718..3ea19b3bb 100644 --- a/.github/workflows/unittest.yml +++ b/.github/workflows/unittest.yml @@ -42,6 +42,11 @@ jobs: run: | docker compose exec ray-head python tests/run.py --tag standalone + - name: Run unittest ray + working-directory: dj-${{ github.run_id }}/.github/workflows/docker + run: | + docker compose exec ray-head python tests/run.py --tag ray + - name: Remove docker compose working-directory: dj-${{ github.run_id }}/.github/workflows/docker if: always() diff --git a/data_juicer/core/ray_executor.py b/data_juicer/core/ray_executor.py index 3cf7acbb3..1c066ea02 100644 --- a/data_juicer/core/ray_executor.py +++ b/data_juicer/core/ray_executor.py @@ -122,38 +122,33 @@ def run_op(self, op, op_cfg, dataset): batch_size=1) # The batch size here is same as in data.py else: - dataset = dataset.map_batches( - partial(ray_batch_mapper_wrapper, - fn=op.process), - batch_format='pyarrow', - num_gpus=num_gpus, - batch_size=1) + dataset = dataset.map_batches(partial( + ray_batch_mapper_wrapper, fn=op.process), + batch_format='pyarrow', + num_gpus=num_gpus, + batch_size=1) # The batch size here is same as in data.py else: if use_actor: - dataset = dataset.map( - op_cls, - compute=ActorPoolStrategy(), - concurrency=op_proc, - fn_constructor_kwargs=op_args, - num_gpus=num_gpus) + dataset = dataset.map(op_cls, + compute=ActorPoolStrategy(), + concurrency=op_proc, + fn_constructor_kwargs=op_args, + num_gpus=num_gpus) else: - dataset = dataset.map(op.process, - num_gpus=num_gpus) + dataset = dataset.map(op.process, num_gpus=num_gpus) elif isinstance(op, Filter): if use_actor: dataset = dataset.map(op_cls, - compute=ActorPoolStrategy(), - concurrency=op_proc, - fn_constructor_kwargs=op_args, - num_gpus=num_gpus) + compute=ActorPoolStrategy(), + concurrency=op_proc, + fn_constructor_kwargs=op_args, + num_gpus=num_gpus) else: - dataset = dataset.map(op.compute_stats, - num_gpus=num_gpus) + dataset = dataset.map(op.compute_stats, num_gpus=num_gpus) if op.stats_export_path is not None: - dataset.write_json(op.stats_export_path, - force_ascii=False) + dataset.write_json(op.stats_export_path, force_ascii=False) dataset = dataset.filter(op.process) else: logger.error( diff --git a/data_juicer/utils/unittest_utils.py b/data_juicer/utils/unittest_utils.py index 546a28dc3..d7ee7d87d 100644 --- a/data_juicer/utils/unittest_utils.py +++ b/data_juicer/utils/unittest_utils.py @@ -3,8 +3,8 @@ import unittest import numpy -import ray.data as rd import pyarrow as pa +import ray.data as rd from datasets import Dataset from data_juicer.ops import Filter @@ -16,7 +16,7 @@ def TEST_TAG(*tags): """Tags for test case. - Currently, `standalone`, `ray`, `standalone-gpu`, `ray-gpu` are supported. + Currently, `standalone`, `ray` are supported. """ def decorator(func): @@ -56,13 +56,15 @@ def generate_dataset(self, data): """Generate dataset for a specific executor. Args: - type (str, optional): "standalone" or "ray". Defaults to "standalone". + type (str, optional): "standalone" or "ray". + Defaults to "standalone". """ if self.current_tag.startswith('standalone'): return Dataset.from_list(data) elif self.current_tag.startswith('ray'): dataset = rd.from_items(data) if Fields.stats not in dataset.columns(fetch_if_missing=False): + def process_batch_arrow(table: pa.Table) -> pa.Table: new_column_data = [{} for _ in range(len(table))] new_talbe = table.append_column(Fields.stats, @@ -70,7 +72,7 @@ def process_batch_arrow(table: pa.Table) -> pa.Table: return new_talbe dataset = dataset.map_batches(process_batch_arrow, - batch_format='pyarrow') + batch_format='pyarrow') return dataset else: raise ValueError('Unsupported type') @@ -91,15 +93,17 @@ def run_single_op(self, dataset, op, column_names): dataset = dataset.to_pandas().get(column_names) if dataset is None: return [] - return dataset.to_dict(orient="records") + return dataset.to_dict(orient='records') else: raise ValueError('Unsupported type') def assertDatasetEqual(self, first, second): + def convert_record(rec): for key in rec.keys(): # Convert incomparable `list` to comparable `tuple` - if isinstance(rec[key], numpy.ndarray) or isinstance(rec[key], list): + if isinstance(rec[key], numpy.ndarray) or isinstance( + rec[key], list): rec[key] = tuple(rec[key]) return rec @@ -107,4 +111,4 @@ def convert_record(rec): second = [convert_record(d) for d in second] first = sorted(first, key=lambda x: tuple(sorted(x.items()))) second = sorted(second, key=lambda x: tuple(sorted(x.items()))) - return self.assertEqual(first, second) \ No newline at end of file + return self.assertEqual(first, second) diff --git a/tests/run.py b/tests/run.py index 080404b8a..81028ee01 100644 --- a/tests/run.py +++ b/tests/run.py @@ -19,7 +19,7 @@ sys.path.append(file_dir) parser = argparse.ArgumentParser('test runner') -parser.add_argument('--tag', choices=["standalone", "standalone-gpu", "ray", "ray-gpu"], +parser.add_argument('--tag', choices=["standalone", "ray"], default="standalone", help="the tag of tests being run") parser.add_argument('--pattern', default='test_*.py', help='test file pattern') @@ -30,7 +30,7 @@ class TaggedTestLoader(unittest.TestLoader): - def __init__(self, tag=None): + def __init__(self, tag="standalone"): super().__init__() self.tag = tag