Skip to content

Commit

Permalink
Fix GPU CI tests by creating 1 cluster across sessions (#540)
Browse files Browse the repository at this point in the history
* Fix GPU CI tests

Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>

* Remove unused imports

Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>

* Skip GPU Cluster creation if pytest is marked as CPU

Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>

---------

Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
  • Loading branch information
VibhuJawa authored Feb 12, 2025
1 parent c4cb682 commit 6f782a6
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 68 deletions.
20 changes: 20 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import pytest
from dask.distributed import Client

from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from

cudf = gpu_only_import("cudf")
dask_cudf = gpu_only_import("dask_cudf")
LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster")


def pytest_addoption(parser):
Expand All @@ -13,3 +20,16 @@ def pytest_collection_modifyitems(config, items):
for item in items:
if "gpu" in item.keywords:
item.add_marker(skip_gpu)


@pytest.fixture(autouse=True, scope="session")
def gpu_client(request):
if not request.config.getoption("--cpu"):
with LocalCUDACluster(n_workers=1) as cluster, Client(cluster) as client:
request.session.client = client
request.session.cluster = cluster
yield client
client.close()
cluster.close()
else:
yield None
63 changes: 29 additions & 34 deletions tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import pandas as pd
import pytest
from dask.dataframe.utils import assert_eq
from distributed import Client

from nemo_curator import (
BaseModule,
Expand All @@ -26,11 +25,10 @@
)
from nemo_curator.datasets import DocumentDataset
from nemo_curator.filters import MeanWordLengthFilter
from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from
from nemo_curator.utils.import_utils import gpu_only_import

cudf = gpu_only_import("cudf")
dask_cudf = gpu_only_import("dask_cudf")
LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster")


class CPUModule(BaseModule):
Expand Down Expand Up @@ -98,18 +96,12 @@ def gpu_data(raw_data):

@pytest.mark.gpu
class TestBackendSupport:
@pytest.fixture(autouse=True, scope="class")
def gpu_client(self, request):
with LocalCUDACluster(n_workers=1) as cluster, Client(cluster) as client:
request.cls.client = client
request.cls.cluster = cluster
yield

def test_pandas_backend(
self,
cpu_data,
gpu_client,
):
print("client", self.client)
print("client", gpu_client)
dataset, gt_lengths = cpu_data
pipeline = CPUModule()
result = pipeline(dataset)
Expand All @@ -119,8 +111,9 @@ def test_pandas_backend(
def test_cudf_backend(
self,
gpu_data,
gpu_client,
):
print("client", self.client)
print("client", gpu_client)
dataset, gt_lengths = gpu_data
pipeline = GPUModule()
result = pipeline(dataset)
Expand All @@ -131,8 +124,9 @@ def test_any_backend(
self,
cpu_data,
gpu_data,
gpu_client,
):
print("client", self.client)
print("client", gpu_client)
cpu_dataset, gt_cpu_lengths = cpu_data
gt_cpu_lengths = gt_cpu_lengths.rename("any_lengths")
gpu_dataset, gt_gpu_lengths = gpu_data
Expand All @@ -150,8 +144,9 @@ def test_pandas_to_cudf(
self,
cpu_data,
gpu_data,
gpu_client,
):
print("client", self.client)
print("client", gpu_client)
dataset, gt_cpu_lengths = cpu_data
_, gt_gpu_lengths = gpu_data
pipeline = Sequential(
Expand All @@ -170,8 +165,9 @@ def test_cudf_to_pandas(
self,
cpu_data,
gpu_data,
gpu_client,
):
print("client", self.client)
print("client", gpu_client)
_, gt_cpu_lengths = cpu_data
dataset, gt_gpu_lengths = gpu_data
pipeline = Sequential(
Expand All @@ -190,8 +186,9 @@ def test_5x_switch(
self,
cpu_data,
gpu_data,
gpu_client,
):
print("client", self.client)
print("client", gpu_client)
dataset, gt_cpu_lengths = cpu_data
_, gt_gpu_lengths = gpu_data
pipeline = Sequential(
Expand Down Expand Up @@ -220,25 +217,25 @@ def test_5x_switch(
assert_eq(result_df["cpu_lengths"], gt_cpu_lengths)
assert_eq(result_df["gpu_lengths"], gt_gpu_lengths)

def test_wrong_backend_cpu_data(self, cpu_data):
def test_wrong_backend_cpu_data(self, cpu_data, gpu_client):
with pytest.raises(ValueError):
print("client", self.client)
print("client", gpu_client)
dataset, _ = cpu_data
pipeline = GPUModule()
result = pipeline(dataset)
_ = result.df.compute()

def test_wrong_backend_gpu_data(self, gpu_data):
def test_wrong_backend_gpu_data(self, gpu_data, gpu_client):
with pytest.raises(ValueError):
print("client", self.client)
print("client", gpu_client)
dataset, _ = gpu_data
pipeline = CPUModule()
result = pipeline(dataset)
_ = result.df.compute()

def test_unsupported_to_backend(self, cpu_data):
def test_unsupported_to_backend(self, cpu_data, gpu_client):
with pytest.raises(ValueError):
print("client", self.client)
print("client", gpu_client)
dataset, _ = cpu_data
pipeline = ToBackend("fake_backend")
result = pipeline(dataset)
Expand Down Expand Up @@ -281,18 +278,12 @@ def real_module_gpu_data(real_module_raw_data):

@pytest.mark.gpu
class TestRealModules:
@pytest.fixture(autouse=True, scope="class")
def gpu_client(self, request):
with LocalCUDACluster(n_workers=1) as cluster, Client(cluster) as client:
request.cls.client = client
request.cls.cluster = cluster
yield

def test_score_filter(
self,
real_module_cpu_data,
gpu_client,
):
print("client", self.client)
print("client", gpu_client)
dataset, gt_results = real_module_cpu_data
pipeline = ScoreFilter(
MeanWordLengthFilter(), score_field="mean_lengths", score_type=float
Expand All @@ -304,9 +295,10 @@ def test_score_filter(
def test_score_filter_wrong_backend(
self,
real_module_gpu_data,
gpu_client,
):
with pytest.raises(ValueError):
print("client", self.client)
print("client", gpu_client)
dataset, _ = real_module_gpu_data
pipeline = ScoreFilter(
MeanWordLengthFilter(), score_field="mean_lengths", score_type=float
Expand All @@ -318,8 +310,9 @@ def test_fuzzy_dedup(
self,
real_module_gpu_data,
tmpdir,
gpu_client,
):
print(self.client)
print(gpu_client)
dataset, gt_results = real_module_gpu_data
# Dedup might fail when indices per partition do not start from 0
dataset.df = dataset.df.reset_index(drop=True)
Expand Down Expand Up @@ -355,9 +348,10 @@ def test_fuzzy_dedup_wrong_backend(
self,
real_module_cpu_data,
tmpdir,
gpu_client,
):
with pytest.raises(ValueError):
print(self.client)
print(gpu_client)
dataset, _ = real_module_cpu_data
# Dedup might fail when indices per partition do not start from 0
dataset.df = dataset.df.reset_index(drop=True)
Expand All @@ -384,8 +378,9 @@ def test_score_filter_and_fuzzy(
real_module_cpu_data,
real_module_gpu_data,
tmpdir,
gpu_client,
):
print("client", self.client)
print("client", gpu_client)
dataset, _ = real_module_cpu_data
_, gt_results = real_module_gpu_data
dataset.df = dataset.df.reset_index(drop=True)
Expand Down
12 changes: 1 addition & 11 deletions tests/test_classifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,12 @@
# limitations under the License.

import pytest
from distributed import Client

from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from
from nemo_curator.utils.import_utils import gpu_only_import

cudf = gpu_only_import("cudf")
dask_cudf = gpu_only_import("dask_cudf")
LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster")


@pytest.fixture
def gpu_client(request):
with LocalCUDACluster(n_workers=1) as cluster, Client(cluster) as client:
request.client = client
request.cluster = cluster
yield


@pytest.fixture
Expand Down
28 changes: 14 additions & 14 deletions tests/test_fuzzy_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@
import yaml
from dask import config
from dask.dataframe.utils import assert_eq
from distributed import Client

from nemo_curator import LSH, FuzzyDuplicates, FuzzyDuplicatesConfig, MinHash
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.fuzzy_dedup_utils.merge_utils import extract_partitioning_index
from nemo_curator.utils.import_utils import gpu_only_import, gpu_only_import_from
from nemo_curator.utils.import_utils import gpu_only_import

cudf = gpu_only_import("cudf")
dask_cudf = gpu_only_import("dask_cudf")
LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster")


@pytest.fixture
Expand Down Expand Up @@ -303,13 +301,6 @@ def test_partial_overlap(self, tmpdir, false_positive_check):

@pytest.mark.gpu
class TestFuzzyDuplicates:
@pytest.fixture(autouse=True, scope="class")
def gpu_client(self, request):
with LocalCUDACluster(n_workers=1) as cluster, Client(cluster) as client:
request.cls.client = client
request.cls.cluster = cluster
yield

@pytest.mark.parametrize("use_64_bit_hash", [False, True])
@pytest.mark.parametrize(
"num_buckets,jaccard_threshold,duplicate_docs",
Expand All @@ -328,8 +319,9 @@ def test_fuzzy_dedup(
jaccard_threshold,
duplicate_docs,
tmpdir,
gpu_client,
):
print(self.client)
print(gpu_client)
# Dedup might fail when indices per partition do not start from 0
fuzzy_dedup_data.df = fuzzy_dedup_data.df.reset_index(drop=True)
config = FuzzyDuplicatesConfig(
Expand Down Expand Up @@ -408,8 +400,9 @@ def test_different_fields(self, fuzzy_dedup_data, tmpdir):
def test_non_uniform_indices(
self,
tmpdir,
gpu_client,
):
print(self.client)
print(gpu_client)
# Dedup might fail when indices per partition do not start from 0
df = cudf.DataFrame(
{
Expand Down Expand Up @@ -498,7 +491,13 @@ def test_num_anchors(self, large_fuzzy_dedup_data, num_anchors, tmpdir):
],
)
def test_no_fp_check(
self, fuzzy_dedup_data, use_64_bit_hash, num_buckets, duplicate_docs, tmpdir
self,
fuzzy_dedup_data,
use_64_bit_hash,
num_buckets,
duplicate_docs,
tmpdir,
gpu_client,
):
config = FuzzyDuplicatesConfig(
cache_dir=tmpdir,
Expand Down Expand Up @@ -533,6 +532,7 @@ def test_shuffle_fail_fuzzy_dedup_data(
self,
shuffle_fail_fuzzy_dedup_data,
tmpdir,
gpu_client,
):
# Dedup might fail when indices per partition do not start from 0
shuffle_fail_fuzzy_dedup_data.df = shuffle_fail_fuzzy_dedup_data.df.reset_index(
Expand Down Expand Up @@ -569,7 +569,7 @@ def test_shuffle_fail_fuzzy_dedup_data(

@pytest.mark.parametrize("false_positive_check", [True, False])
def test_fuzzy_dedup_no_duplicates(
self, no_duplicates_fuzzy_dedup_data, tmpdir, false_positive_check
self, no_duplicates_fuzzy_dedup_data, tmpdir, false_positive_check, gpu_client
):
# Dedup might fail when indices per partition do not start from 0
no_duplicates_fuzzy_dedup_data.df = (
Expand Down
11 changes: 2 additions & 9 deletions tests/test_semdedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

cudf = gpu_only_import("cudf")
dask_cudf = gpu_only_import("dask_cudf")
LocalCUDACluster = gpu_only_import_from("dask_cuda", "LocalCUDACluster")
EmbeddingCreator = gpu_only_import_from(
"nemo_curator.modules.semantic_dedup.embeddings", "EmbeddingCreator"
)
Expand Down Expand Up @@ -55,19 +54,13 @@ def dedup_data():

@pytest.mark.gpu
class TestSemDuplicates:
@pytest.fixture(autouse=True, scope="class")
def gpu_client(self, request):
with LocalCUDACluster(n_workers=1) as cluster, Client(cluster) as client:
request.cls.client = client
request.cls.cluster = cluster
yield

def test_sem_dedup(
self,
dedup_data,
tmpdir,
gpu_client,
):
print("client", self.client)
print("client", gpu_client)
cache_dir = os.path.join(tmpdir, "test_sem_dedup_cache")
config = SemDedupConfig(
cache_dir=cache_dir,
Expand Down

0 comments on commit 6f782a6

Please sign in to comment.