Skip to content

Commit

Permalink
add ray test in workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
pan-x-c committed May 7, 2024
1 parent 6e6409d commit a2091d8
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 31 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/unittest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ jobs:
run: |
docker compose exec ray-head python tests/run.py --tag standalone
- name: Run unittest ray
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head python tests/run.py --tag ray
- name: Remove docker compose
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
if: always()
Expand Down
39 changes: 17 additions & 22 deletions data_juicer/core/ray_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,38 +122,33 @@ def run_op(self, op, op_cfg, dataset):
batch_size=1)
# The batch size here is same as in data.py
else:
dataset = dataset.map_batches(
partial(ray_batch_mapper_wrapper,
fn=op.process),
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
dataset = dataset.map_batches(partial(
ray_batch_mapper_wrapper, fn=op.process),
batch_format='pyarrow',
num_gpus=num_gpus,
batch_size=1)
# The batch size here is same as in data.py
else:
if use_actor:
dataset = dataset.map(
op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
dataset = dataset.map(op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.process,
num_gpus=num_gpus)
dataset = dataset.map(op.process, num_gpus=num_gpus)

elif isinstance(op, Filter):
if use_actor:
dataset = dataset.map(op_cls,
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
compute=ActorPoolStrategy(),
concurrency=op_proc,
fn_constructor_kwargs=op_args,
num_gpus=num_gpus)
else:
dataset = dataset.map(op.compute_stats,
num_gpus=num_gpus)
dataset = dataset.map(op.compute_stats, num_gpus=num_gpus)
if op.stats_export_path is not None:
dataset.write_json(op.stats_export_path,
force_ascii=False)
dataset.write_json(op.stats_export_path, force_ascii=False)
dataset = dataset.filter(op.process)
else:
logger.error(
Expand Down
18 changes: 11 additions & 7 deletions data_juicer/utils/unittest_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import unittest

import numpy
import ray.data as rd
import pyarrow as pa
import ray.data as rd
from datasets import Dataset

from data_juicer.ops import Filter
Expand All @@ -16,7 +16,7 @@

def TEST_TAG(*tags):
"""Tags for test case.
Currently, `standalone`, `ray`, `standalone-gpu`, `ray-gpu` are supported.
Currently, `standalone`, `ray` are supported.
"""

def decorator(func):
Expand Down Expand Up @@ -56,21 +56,23 @@ def generate_dataset(self, data):
"""Generate dataset for a specific executor.
Args:
type (str, optional): "standalone" or "ray". Defaults to "standalone".
type (str, optional): "standalone" or "ray".
Defaults to "standalone".
"""
if self.current_tag.startswith('standalone'):
return Dataset.from_list(data)
elif self.current_tag.startswith('ray'):
dataset = rd.from_items(data)
if Fields.stats not in dataset.columns(fetch_if_missing=False):

def process_batch_arrow(table: pa.Table) -> pa.Table:
new_column_data = [{} for _ in range(len(table))]
new_talbe = table.append_column(Fields.stats,
[new_column_data])
return new_talbe

dataset = dataset.map_batches(process_batch_arrow,
batch_format='pyarrow')
batch_format='pyarrow')
return dataset
else:
raise ValueError('Unsupported type')
Expand All @@ -91,20 +93,22 @@ def run_single_op(self, dataset, op, column_names):
dataset = dataset.to_pandas().get(column_names)
if dataset is None:
return []
return dataset.to_dict(orient="records")
return dataset.to_dict(orient='records')
else:
raise ValueError('Unsupported type')

def assertDatasetEqual(self, first, second):

def convert_record(rec):
for key in rec.keys():
# Convert incomparable `list` to comparable `tuple`
if isinstance(rec[key], numpy.ndarray) or isinstance(rec[key], list):
if isinstance(rec[key], numpy.ndarray) or isinstance(
rec[key], list):
rec[key] = tuple(rec[key])
return rec

first = [convert_record(d) for d in first]
second = [convert_record(d) for d in second]
first = sorted(first, key=lambda x: tuple(sorted(x.items())))
second = sorted(second, key=lambda x: tuple(sorted(x.items())))
return self.assertEqual(first, second)
return self.assertEqual(first, second)
4 changes: 2 additions & 2 deletions tests/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
sys.path.append(file_dir)

parser = argparse.ArgumentParser('test runner')
parser.add_argument('--tag', choices=["standalone", "standalone-gpu", "ray", "ray-gpu"],
parser.add_argument('--tag', choices=["standalone", "ray"],
default="standalone",
help="the tag of tests being run")
parser.add_argument('--pattern', default='test_*.py', help='test file pattern')
Expand All @@ -30,7 +30,7 @@


class TaggedTestLoader(unittest.TestLoader):
def __init__(self, tag=None):
def __init__(self, tag="standalone"):
super().__init__()
self.tag = tag

Expand Down

0 comments on commit a2091d8

Please sign in to comment.