diff --git a/docs/sections/learn/advanced/distiset.md b/docs/sections/learn/advanced/distiset.md index 17be198ae..7c5e12261 100644 --- a/docs/sections/learn/advanced/distiset.md +++ b/docs/sections/learn/advanced/distiset.md @@ -70,6 +70,31 @@ distiset.push_to_hub( ) ``` +### Save and load from disk + +Saves the [`Distiset`][distilabel.distiset.Distiset] to disk, and optionally (will be done by default) saves the dataset card, the pipeline config file and logs: + +```python +distiset.save_to_disk( + "my-dataset", + save_card=True, + save_pipeline_config=True, + save_pipeline_log=True +) +``` + +And load a [`Distiset`][distilabel.distiset.Distiset] that was saved using [`Distiset.save_to_disk`][distilabel.distiset.Distiset.save_to_disk] from disk just the same way: + +```python +from distilabel.distiset import Distiset + +distiset = Distiset.save_to_disk("my-dataset") +``` + +Take into account that these methods pass work as `datasets.load_from_disk` and `datasets.Dataset.save_to_disk` so the arguments are directly passed to those methods. This means you can also make use of `storage_options` argument to save your [`Distiset`][distilabel.distiset.Distiset] in your cloud provider, including the distilabel artifacts (`pipeline.yaml`, `pipeline.log` and the `README.md` with the dataset card), you can read more in `datasets` documentation [here](https://huggingface.co/docs/datasets/filesystems#saving-serialized-datasets). + +Take a look at the remaining arguments at [`Distiset.save_to_disk`][distilabel.distiset.Distiset.save_to_disk]. + ## Dataset card Having this special type of dataset comes with an added advantage when calling [`Distiset.push_to_hub`][distilabel.distiset.Distiset], which is the automatically generated dataset card in the Hugging Face Hub. Note that it is enabled by default, but can be disabled by setting `generate_card=False`: diff --git a/pyproject.toml b/pyproject.toml index e22c80a16..8883eb0bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "typer >= 0.9.0", "tblib >= 3.0.0", "orjson >= 3.10.0", + "universal_pathlib >= 0.2.2", ] dynamic = ["version"] diff --git a/src/distilabel/distiset.py b/src/distilabel/distiset.py index 98c11009f..3538bccd3 100644 --- a/src/distilabel/distiset.py +++ b/src/distilabel/distiset.py @@ -13,13 +13,20 @@ # limitations under the License. import logging +import os.path as posixpath import re +from os import PathLike from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict, Final, Optional, Union -from datasets import load_dataset +import fsspec +import yaml +from datasets import Dataset, load_dataset, load_from_disk +from datasets.filesystems import is_remote_filesystem from huggingface_hub import DatasetCardData, HfApi +from huggingface_hub.file_download import hf_hub_download from pyarrow.lib import ArrowInvalid +from typing_extensions import Self from distilabel.utils.card.dataset_card import ( DistilabelDatasetCard, @@ -27,6 +34,10 @@ ) from distilabel.utils.files import list_files_in_dir +DISTISET_CONFIG_FOLDER: Final[str] = "distiset_configs" +PIPELINE_CONFIG_FILENAME: Final[str] = "pipeline.yaml" +PIPELINE_LOG_FILENAME: Final[str] = "pipeline.log" + class Distiset(dict): """Convenient wrapper around `datasets.Dataset` to push to the Hugging Face Hub. @@ -40,8 +51,8 @@ class Distiset(dict): pipeline. """ - pipeline_path: Optional[Path] = None - log_filename_path: Optional[Path] = None + _pipeline_path: Optional[Path] = None + _log_filename_path: Optional[Path] = None def push_to_hub( self, @@ -83,14 +94,23 @@ def push_to_hub( if generate_card: self._generate_card(repo_id, token) - def _generate_card(self, repo_id: str, token: Optional[str]) -> None: - """Generates a dataset card and pushes it to the Hugging Face Hub, and - if the `pipeline.yaml` path is available in the `Distiset`, uploads that - to the same repository. + def _get_card( + self, repo_id: str, token: Optional[str] = None + ) -> DistilabelDatasetCard: + """Generates the dataset card for the `Distiset`. + + Note: + If `repo_id` and `token` are provided, it will extract the metadata from the README.md file + on the hub. Args: - repo_id: The ID of the repository to push to, from the `push_to_hub` method. - token: The token to authenticate with the Hugging Face Hub, from the `push_to_hub` method. + repo_id: Name of the repository to push to, or the path for the distiset if saved to disk. + token: The token to authenticate with the Hugging Face Hub. + We assume that if it's provided, the dataset will be in the Hugging Face Hub, + so the README metadata will be extracted from there. + + Returns: + The dataset card for the `Distiset`. """ sample_records = {} for name, dataset in self.items(): @@ -98,8 +118,12 @@ def _generate_card(self, repo_id: str, token: Optional[str]) -> None: dataset[0] if not isinstance(dataset, dict) else dataset["train"][0] ) + readme_metadata = {} + if repo_id and token: + readme_metadata = self._extract_readme_metadata(repo_id, token) + metadata = { - **self._extract_readme_metadata(repo_id, token), + **readme_metadata, "size_categories": size_categories_parser( max(len(dataset) for dataset in self.values()) ), @@ -111,29 +135,8 @@ def _generate_card(self, repo_id: str, token: Optional[str]) -> None: repo_id=repo_id, sample_records=sample_records, ) - card.push_to_hub( - repo_id, - repo_type="dataset", - token=token, - ) - if self.pipeline_path: - # If the pipeline.yaml is available, upload it to the Hugging Face Hub as well. - HfApi().upload_file( - path_or_fileobj=self.pipeline_path, - path_in_repo="pipeline.yaml", - repo_id=repo_id, - repo_type="dataset", - token=token, - ) - if self.log_filename_path: - # The same we had with "pipeline.yaml" but with the log file. - HfApi().upload_file( - path_or_fileobj=self.log_filename_path, - path_in_repo="pipeline.log", - repo_id=repo_id, - repo_type="dataset", - token=token, - ) + + return card def _extract_readme_metadata( self, repo_id: str, token: Optional[str] @@ -150,11 +153,6 @@ def _extract_readme_metadata( Returns: The metadata extracted from the README.md file of the dataset repository as a dict. """ - import re - - import yaml - from huggingface_hub.file_download import hf_hub_download - readme_path = Path( hf_hub_download(repo_id, "README.md", repo_type="dataset", token=token) ) @@ -163,12 +161,47 @@ def _extract_readme_metadata( metadata = yaml.safe_load(metadata) return metadata + def _generate_card(self, repo_id: str, token: str) -> None: + """Generates a dataset card and pushes it to the Hugging Face Hub, and + if the `pipeline.yaml` path is available in the `Distiset`, uploads that + to the same repository. + + Args: + repo_id: The ID of the repository to push to, from the `push_to_hub` method. + token: The token to authenticate with the Hugging Face Hub, from the `push_to_hub` method. + """ + card = self._get_card(repo_id=repo_id, token=token) + + card.push_to_hub( + repo_id, + repo_type="dataset", + token=token, + ) + if self.pipeline_path: + # If the pipeline.yaml is available, upload it to the Hugging Face Hub as well. + HfApi().upload_file( + path_or_fileobj=self.pipeline_path, + path_in_repo=PIPELINE_CONFIG_FILENAME, + repo_id=repo_id, + repo_type="dataset", + token=token, + ) + if self.log_filename_path: + # The same we had with "pipeline.yaml" but with the log file. + HfApi().upload_file( + path_or_fileobj=self.log_filename_path, + path_in_repo=PIPELINE_LOG_FILENAME, + repo_id=repo_id, + repo_type="dataset", + token=token, + ) + def train_test_split( self, train_size: float, shuffle: bool = True, seed: Optional[int] = None, - ) -> "Distiset": + ) -> Self: """Return a `Distiset` whose values will be a `datasets.DatasetDict` with two random train and test subsets. Splits are created from the dataset according to `train_size` and `shuffle`. @@ -192,6 +225,198 @@ def train_test_split( ) return self + def save_to_disk( + self, + distiset_path: PathLike, + max_shard_size: Optional[Union[str, int]] = None, + num_shards: Optional[int] = None, + num_proc: Optional[int] = None, + storage_options: Optional[dict] = None, + save_card: bool = True, + save_pipeline_config: bool = True, + save_pipeline_log: bool = True, + ) -> None: + r""" + Saves a `Distiset` to a dataset directory, or in a filesystem using any implementation of `fsspec.spec.AbstractFileSystem`. + + In case you want to save the `Distiset` in a remote filesystem, you can pass the `storage_options` parameter + as you would do with `datasets`'s `Dataset.save_to_disk` method: [see example](https://huggingface.co/docs/datasets/filesystems#saving-serialized-datasets) + + Args: + distiset_path: Path where you want to save the `Distiset`. It can be a local path + (e.g. `dataset/train`) or remote URI (e.g. `s3://my-bucket/dataset/train`) + max_shard_size: The maximum size of the dataset shards to be uploaded to the hub. + If expressed as a string, needs to be digits followed by a unit (like `"50MB"`). + Defaults to `None`. + num_shards: Number of shards to write. By default the number of shards depends on + `max_shard_size` and `num_proc`. Defaults to `None`. + num_proc: Number of processes when downloading and generating the dataset locally. + Multiprocessing is disabled by default. Defaults to `None`. + storage_options: Key/value pairs to be passed on to the file-system backend, if any. + Defaults to `None`. + save_card: Whether to save the dataset card. Defaults to `True`. + save_pipeline_config: Whether to save the pipeline configuration file (aka the `pipeline.yaml` file). + Defaults to `True`. + save_pipeline_log: Whether to save the pipeline log file (aka the `pipeline.log` file). + Defaults to `True`. + + Examples: + ```python + # Save your distiset in a local folder: + >>> distiset.save_to_disk(dataset_path="my-distiset") + # Save your distiset in a remote storage: + >>> storage_options = { + ... "key": os.environ["S3_ACCESS_KEY"], + ... "secret": os.environ["S3_SECRET_KEY"], + ... "client_kwargs": { + ... "endpoint_url": os.environ["S3_ENDPOINT_URL"], + ... "region_name": os.environ["S3_REGION"], + ... }, + ... } + >>> distiset.save_to_disk(dataset_path="my-distiset", storage_options=storage_options) + ``` + """ + distiset_path = str(distiset_path) + for name, dataset in self.items(): + dataset.save_to_disk( + f"{distiset_path}/{name}", + max_shard_size=max_shard_size, + num_shards=num_shards, + num_proc=num_proc, + storage_options=storage_options, + ) + + distiset_config_folder = posixpath.join(distiset_path, DISTISET_CONFIG_FOLDER) + + fs: fsspec.AbstractFileSystem + fs, _, _ = fsspec.get_fs_token_paths( + distiset_config_folder, storage_options=storage_options + ) + fs.makedirs(distiset_config_folder, exist_ok=True) + + if save_card: + # NOTE: Currently the card is not the same if we write to disk or push to the HF hub, + # as we aren't generating the README copying/updating the data from the dataset repo. + card = self._get_card(repo_id=Path(distiset_path).stem, token=None) + new_filename = posixpath.join(distiset_config_folder, "README.md") + if storage_options: + # Write the card the same way as DatasetCard.save does: + with fs.open(new_filename, "w", newline="", encoding="utf-8") as f: + f.write(str(card)) + else: + card.save(new_filename) + + # Write our internal files to the distiset folder by copying them to the distiset folder. + if save_pipeline_config and self.pipeline_path: + new_filename = posixpath.join( + distiset_config_folder, PIPELINE_CONFIG_FILENAME + ) + if self.pipeline_path.exists() and (not fs.isfile(new_filename)): + data = yaml.safe_load(self.pipeline_path.read_text()) + with fs.open(new_filename, "w", encoding="utf-8") as f: + yaml.dump(data, f, default_flow_style=False) + + if save_pipeline_log and self.log_filename_path: + new_filename = posixpath.join(distiset_config_folder, PIPELINE_LOG_FILENAME) + if self.log_filename_path.exists() and (not fs.isfile(new_filename)): + data = self.log_filename_path.read_text() + with fs.open(new_filename, "w", encoding="utf-8") as f: + f.write(data) + + @classmethod + def load_from_disk( + cls, + distiset_path: PathLike, + keep_in_memory: Optional[bool] = None, + storage_options: Optional[Dict[str, Any]] = None, + download_dir: Optional[PathLike] = None, + ) -> Self: + """Loads a dataset that was previously saved using `Distiset.save_to_disk` from a dataset + directory, or from a filesystem using any implementation of `fsspec.spec.AbstractFileSystem`. + + Args: + distiset_path: Path ("dataset/train") or remote URI ("s3://bucket/dataset/train"). + keep_in_memory: Whether to copy the dataset in-memory, see `datasets.Dataset.load_from_disk`` + for more information. Defaults to `None`. + storage_options: Key/value pairs to be passed on to the file-system backend, if any. + Defaults to `None`. + download_dir: Optional directory to download the dataset to. Defaults to None, + in which case it will create a temporary directory. + + Returns: + A `Distiset` loaded from disk, it should be a `Distiset` object created using `Distiset.save_to_disk`. + """ + original_distiset_path = str(distiset_path) + + fs: fsspec.AbstractFileSystem + fs, _, [distiset_path] = fsspec.get_fs_token_paths( + original_distiset_path, storage_options=storage_options + ) + dest_distiset_path = distiset_path + + assert fs.isdir( + original_distiset_path + ), "`distiset_path` must be a `PathLike` object pointing to a folder or a URI of a remote filesystem." + + has_config = False + distiset = cls() + + if is_remote_filesystem(fs): + src_dataset_path = distiset_path + if download_dir: + dest_distiset_path = download_dir + else: + dest_distiset_path = Dataset._build_local_temp_path(src_dataset_path) + fs.download(src_dataset_path, dest_distiset_path.as_posix(), recursive=True) + + # Now we should have the distiset locally, so we can read those files + for folder in Path(dest_distiset_path).iterdir(): + if folder.stem == DISTISET_CONFIG_FOLDER: + has_config = True + continue + distiset[folder.stem] = load_from_disk( + str(folder), + keep_in_memory=keep_in_memory, + ) + # From the config folder we just need to point to the files. Once downloaded we set the path + # to wherever they are. + if has_config: + distiset_config_folder = posixpath.join( + dest_distiset_path, DISTISET_CONFIG_FOLDER + ) + + pipeline_path = posixpath.join( + distiset_config_folder, PIPELINE_CONFIG_FILENAME + ) + if Path(pipeline_path).exists(): + distiset.pipeline_path = Path(pipeline_path) + + log_filename_path = posixpath.join( + distiset_config_folder, PIPELINE_LOG_FILENAME + ) + if Path(log_filename_path).exists(): + distiset.log_filename_path = Path(log_filename_path) + + return distiset + + @property + def pipeline_path(self) -> Union[Path, None]: + """Returns the path to the `pipeline.yaml` file that generated the `Pipeline`.""" + return self._pipeline_path + + @pipeline_path.setter + def pipeline_path(self, path: PathLike) -> None: + self._pipeline_path = Path(path) + + @property + def log_filename_path(self) -> Union[Path, None]: + """Returns the path to the `pipeline.log` file that generated the `Pipeline`.""" + return self._log_filename_path + + @log_filename_path.setter + def log_filename_path(self, path: PathLike) -> None: + self._log_filename_path = Path(path) + def __repr__(self): # Copy from `datasets.DatasetDict.__repr__`. repr = "\n".join([f"{k}: {v}" for k, v in self.items()]) @@ -207,6 +432,9 @@ def create_distiset( # noqa: C901 ) -> Distiset: """Creates a `Distiset` from the buffer folder. + This function is intended to be used as a helper to create a `Distiset` from from the folder + where the cached data was written by the `_WriteBuffer`. + Args: data_dir: Folder where the data buffers were written by the `_WriteBuffer`. It should correspond to `CacheLocation.data`. @@ -222,6 +450,13 @@ def create_distiset( # noqa: C901 Returns: The dataset created from the buffer folder, where the different leaf steps will correspond to different configurations of the dataset. + + Examples: + + ```python + >>> from pathlib import Path + >>> distiset = create_distiset(Path.home() / ".cache/distilabel/pipelines/path-to-pipe-hashname") + ``` """ from distilabel.steps.constants import DISTILABEL_METADATA_KEY diff --git a/tests/unit/test_distiset.py b/tests/unit/test_distiset.py index 18de3f876..07e6549d7 100644 --- a/tests/unit/test_distiset.py +++ b/tests/unit/test_distiset.py @@ -12,9 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy +import re +import tempfile +from pathlib import Path +from typing import Any, Dict, Optional + import pytest +import yaml from datasets import Dataset, DatasetDict from distilabel.distiset import Distiset +from upath import UPath @pytest.fixture(scope="function") @@ -27,6 +35,24 @@ def distiset(): ) +def make_fake_file(filename: Path) -> None: + if not filename.parent.exists(): + filename.parent.mkdir(parents=True) + filename.touch() + + +def add_config_to_distiset(distiset: Distiset, folder: Path) -> Distiset: + from distilabel.distiset import DISTISET_CONFIG_FOLDER + + pipeline_yaml = folder / DISTISET_CONFIG_FOLDER / "pipeline.yaml" + pipeline_log = folder / DISTISET_CONFIG_FOLDER / "pipeline.log" + make_fake_file(pipeline_yaml) + make_fake_file(pipeline_log) + distiset.pipeline_path = pipeline_yaml + distiset.pipeline_log_path = pipeline_log + return distiset + + class TestDistiset: def test_train_test_split(self, distiset: Distiset) -> None: assert isinstance(distiset["leaf_step_1"], Dataset) @@ -34,3 +60,111 @@ def test_train_test_split(self, distiset: Distiset) -> None: assert isinstance(ds, Distiset) assert len(ds) == 2 assert isinstance(ds["leaf_step_1"], DatasetDict) + + @pytest.mark.parametrize("storage_options", [None, {"test": "option"}]) + @pytest.mark.parametrize("with_config", [False, True]) + def test_save_to_disk( + self, + distiset: Distiset, + with_config: bool, + storage_options: Optional[Dict[str, Any]], + ) -> None: + full_distiset = copy.deepcopy(distiset) + # Distiset with Distiset + with tempfile.TemporaryDirectory() as tmpdirname: + folder = Path(tmpdirname) / "distiset_folder" + if with_config: + full_distiset = add_config_to_distiset(full_distiset, folder) + + full_distiset.save_to_disk( + folder, + save_card=with_config, + save_pipeline_config=with_config, + save_pipeline_log=with_config, + storage_options=storage_options, + ) + assert folder.is_dir() + assert len(list(folder.iterdir())) == 3 + + full_distiset = copy.deepcopy(distiset) + # Distiset with DatasetDict + distiset_with_dict = full_distiset.train_test_split(0.8) + with tempfile.TemporaryDirectory() as tmpdirname: + folder = Path(tmpdirname) / "distiset_folder" + if with_config: + distiset_with_dict = add_config_to_distiset(distiset_with_dict, folder) + + distiset_with_dict.save_to_disk( + folder, + save_card=with_config, + save_pipeline_config=with_config, + save_pipeline_log=with_config, + ) + + assert folder.is_dir() + assert len(list(folder.iterdir())) == 3 + + @pytest.mark.parametrize("pathlib_implementation", [Path, UPath]) + @pytest.mark.parametrize("storage_options", [None, {"project": "experiments"}]) + @pytest.mark.parametrize("with_config", [False, True]) + def test_load_from_disk( + self, + distiset: Distiset, + with_config: bool, + storage_options: Optional[Dict[str, Any]], + pathlib_implementation: type, + ) -> None: + full_distiset = copy.deepcopy(distiset) + # Distiset with Distiset + with tempfile.TemporaryDirectory() as tmpdirname: + # This way we can test also we work with UPath, using FilePath protocol, as it should + # do the same as S3Path, GCSPath, etc. + folder = pathlib_implementation(tmpdirname) / "distiset_folder" + if with_config: + full_distiset = add_config_to_distiset(full_distiset, folder) + full_distiset.save_to_disk( + folder, + save_card=with_config, + save_pipeline_config=with_config, + save_pipeline_log=with_config, + storage_options=storage_options, + ) + ds = Distiset.load_from_disk( + folder, + storage_options=storage_options, + ) + assert isinstance(ds, Distiset) + assert isinstance(ds["leaf_step_1"], Dataset) + + if with_config: + assert ds.pipeline_path.exists() + assert ds.log_filename_path.exists() + + full_distiset = copy.deepcopy(distiset) + # Distiset with DatasetDict + distiset_with_dict = full_distiset.train_test_split(0.8) + with tempfile.TemporaryDirectory() as tmpdirname: + folder = pathlib_implementation(tmpdirname) / "distiset_folder" + if with_config: + distiset_with_dict = add_config_to_distiset(distiset_with_dict, folder) + + distiset_with_dict.save_to_disk(folder) + ds = Distiset.load_from_disk(folder, storage_options=storage_options) + + assert folder.is_dir() + assert isinstance(ds["leaf_step_1"], DatasetDict) + + if with_config: + assert ds.pipeline_path.exists() + assert ds.log_filename_path.exists() + + def test_dataset_card(self, distiset: Distiset) -> None: + # Test the the metadata we generate by default without extracting the already generated content from the HF hub. + # We parse the content and check it's the same as the one we generate. + distiset_card = distiset._get_card("repo_name_or_path") + metadata = re.findall(r"---\n(.*?)\n---", str(distiset_card), re.DOTALL)[0] + metadata = yaml.safe_load(metadata) + assert metadata == { + "size_categories": "n<1K", + "tags": ["synthetic", "distilabel", "rlaif"], + }