Skip to content

Commit

Permalink
Merge branch 'main' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmbmb committed Jan 28, 2025
2 parents 43d1bb1 + 1b6c101 commit 84ea198
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 102 deletions.
35 changes: 7 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
</a>
</p>


Distilabel is the framework for synthetic data and AI feedback for engineers who need fast, reliable and scalable pipelines based on verified research papers.

If you just want to get started, we recommend you check the [documentation](http://distilabel.argilla.io/). Curious, and want to know more? Keep reading!
Expand Down Expand Up @@ -119,43 +118,23 @@ pip install "distilabel[hf-inference-endpoints]" --upgrade
Then run:

```python
from datasets import load_dataset

from distilabel.models import InferenceEndpointsLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

with Pipeline(
name="simple-text-generation-pipeline",
description="A simple text generation pipeline",
) as pipeline:
load_dataset = LoadDataFromHub(output_mappings={"prompt": "instruction"})

text_generation = TextGeneration(
with Pipeline() as pipeline:
TextGeneration(
llm=InferenceEndpointsLLM(
model_id="meta-llama/Meta-Llama-3.1-8B-Instruct",
tokenizer_id="meta-llama/Meta-Llama-3.1-8B-Instruct",
generation_kwargs={"temperature": 0.7, "max_new_tokens": 512},
),
)

load_dataset >> text_generation

if __name__ == "__main__":
distiset = pipeline.run(
parameters={
load_dataset.name: {
"repo_id": "distilabel-internal-testing/instruction-dataset-mini",
"split": "test",
},
text_generation.name: {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
},
)
dataset = load_dataset("distilabel-internal-testing/instructions", split="test")
distiset = pipeline.run(dataset=dataset)
distiset.push_to_hub(repo_id="distilabel-example")
```

Expand Down
65 changes: 14 additions & 51 deletions docs/sections/getting_started/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,69 +46,32 @@ The `InstructionResponsePipeline` class will use the `InferenceEndpointsLLM` cla

## Define a Custom pipeline

In this guide we will walk you through the process of creating a simple pipeline that uses the [`InferenceEndpointsLLM`][distilabel.models.llms.InferenceEndpointsLLM] class to generate text. The [`Pipeline`][distilabel.pipeline.Pipeline] will load a dataset that contains a column named `prompt` from the Hugging Face Hub via the step [`LoadDataFromHub`][distilabel.steps.LoadDataFromHub] and then use the [`InferenceEndpointsLLM`][distilabel.models.llms.InferenceEndpointsLLM] class to generate text based on the dataset using the [`TextGeneration`](https://distilabel.argilla.io/dev/components-gallery/tasks/textgeneration/) task.
In this guide we will walk you through the process of creating a simple pipeline that uses the [InferenceEndpointsLLM][distilabel.models.llms.InferenceEndpointsLLM] class to generate text. The [Pipeline][distilabel.pipeline.Pipeline] will process a dataset loaded directly using the Hugging Face `datasets` library and use the [InferenceEndpointsLLM][distilabel.models.llms.InferenceEndpointsLLM] class to generate text using the [TextGeneration][distilabel.steps.tasks.text_generation.TextGeneration] task.

> You can check the available models in the [Hugging Face Model Hub](https://huggingface.co/models?pipeline_tag=text-generation&sort=trending) and filter by `Inference status`.
```python
from datasets import load_dataset

from distilabel.models import InferenceEndpointsLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

with Pipeline( # (1)
name="simple-text-generation-pipeline",
description="A simple text generation pipeline",
) as pipeline: # (2)
load_dataset = LoadDataFromHub( # (3)
output_mappings={"prompt": "instruction"},
)

text_generation = TextGeneration( # (4)
with Pipeline() as pipeline: # (1)
TextGeneration( # (2)
llm=InferenceEndpointsLLM(
model_id="meta-llama/Meta-Llama-3.1-8B-Instruct",
tokenizer_id="meta-llama/Meta-Llama-3.1-8B-Instruct",
), # (5)
system_prompt="You are a creative AI Assistant writer.",
template="Follow the following instruction: {{ instruction }}" # (6)
generation_kwargs={"temperature": 0.7, "max_new_tokens": 512},
),
)

load_dataset >> text_generation # (7)

if __name__ == "__main__":
distiset = pipeline.run( # (8)
parameters={
load_dataset.name: {
"repo_id": "distilabel-internal-testing/instruction-dataset-mini",
"split": "test",
},
text_generation.name: {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
},
)
distiset.push_to_hub(repo_id="distilabel-example") # (9)
dataset = load_dataset("distilabel-internal-testing/instructions", split="test") # (3)
distiset = pipeline.run(dataset=dataset)
distiset.push_to_hub(repo_id="distilabel-example") # (4)
```

1. We define a [`Pipeline`][distilabel.pipeline.Pipeline] with the name `simple-text-generation-pipeline` and a description `A simple text generation pipeline`. Note that the `name` is mandatory and will be used to calculate the `cache` signature path, so changing the name will change the cache path and will be identified as a different pipeline.

2. We are using the [`Pipeline`][distilabel.pipeline.Pipeline] context manager, meaning that every [`Step`][distilabel.steps.base.Step] subclass that is defined within the context manager will be added to the pipeline automatically.

3. We define a [`LoadDataFromHub`][distilabel.steps.LoadDataFromHub] step named `load_dataset` that will load a dataset from the Hugging Face Hub, as provided via runtime parameters in the `pipeline.run` method below, but it can also be defined within the class instance via the arg `repo_id=...`. This step will produce output batches with the rows from the dataset, and the column `prompt` will be mapped to the `instruction` field.

4. We define a [`TextGeneration`](https://distilabel.argilla.io/dev/components-gallery/tasks/textgeneration/) task named `text_generation` that will generate text based on the `instruction` field from the dataset. This task will use the [`InferenceEndpointsLLM`][distilabel.models.llms.InferenceEndpointsLLM] class with the model `Meta-Llama-3.1-8B-Instruct`.

5. We define the [`InferenceEndpointsLLM`][distilabel.models.llms.InferenceEndpointsLLM] class with the model `Meta-Llama-3.1-8B-Instruct` that will be used by the [`TextGeneration`](https://distilabel.argilla.io/dev/components-gallery/tasks/textgeneration/) task. In this case, since the [`InferenceEndpointsLLM`][distilabel.models.llms.InferenceEndpointsLLM] is used, we assume that the `HF_TOKEN` environment variable is set.

6. Both `system_prompt` and `template` are optional fields. The `template` must be informed as a string following the [Jinja2](https://jinja.palletsprojects.com/en/3.1.x/templates/#synopsis) template format, and the fields that appear there ("instruction" in this case, which corresponds to the default) must be informed in the `columns` attribute. The component gallery for [`TextGeneration`](https://distilabel.argilla.io/dev/components-gallery/tasks/textgeneration/) has examples to get you started.

7. We connect the `load_dataset` step to the `text_generation` task using the `rshift` operator, meaning that the output from the `load_dataset` step will be used as input for the `text_generation` task.

8. We run the pipeline with the parameters for the `load_dataset` and `text_generation` steps. The `load_dataset` step will use the repository `distilabel-internal-testing/instruction-dataset-mini` and the `test` split, and the `text_generation` task will use the `generation_kwargs` with the `temperature` set to `0.7` and the `max_new_tokens` set to `512`.

9. Optionally, we can push the generated [`Distiset`][distilabel.distiset.Distiset] to the Hugging Face Hub repository `distilabel-example`. This will allow you to share the generated dataset with others and use it in other pipelines.
1. We define a [Pipeline][distilabel.pipeline.Pipeline] using its context manager. Any [Step][distilabel.steps.base.Step] subclass defined within the context manager will be automatically added to the pipeline.
2. We define a [TextGeneration][distilabel.steps.tasks.text_generation.TextGeneration] task that uses the [InferenceEndpointsLLM][distilabel.models.llms.InferenceEndpointsLLM] class with the model Meta-Llama-3.1-8B-Instruct. The generation parameters are set directly in the LLM configuration with a temperature of 0.7 and maximum of 512 new tokens.
3. We load the dataset directly using the Hugging Face datasets library from the repository "distilabel-internal-testing/instructions" using the "test" split.
4. Optionally, we can push the generated [Distiset][distilabel.distiset.Distiset] to the Hugging Face Hub repository distilabel-example. This will allow you to share the generated dataset with others and use it in other pipelines.
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, we can explore the UI and add a final human touch to get he most out of our dataset. "
"Now, we can explore the UI and add a final human touch to get the most out of our dataset. "
]
},
{
Expand Down
21 changes: 19 additions & 2 deletions src/distilabel/distiset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import importlib.util
import json
import logging
import os.path as posixpath
Expand Down Expand Up @@ -52,6 +53,19 @@
from distilabel.pipeline._dag import DAG


def is_PIL_available() -> bool:
"""Checks if the PIL library is available.
Returns:
True if the PIL library is available, False otherwise.
"""
try:
importlib.util.find_spec("PIL")
except ImportError:
return False
return True


class Distiset(dict):
"""Convenient wrapper around `datasets.Dataset` to push to the Hugging Face Hub.
Expand Down Expand Up @@ -187,11 +201,14 @@ def _get_card(
record = (
dataset[0] if not isinstance(dataset, dict) else dataset["train"][0]
)
from PIL import ImageFile
if is_PIL_available():
from PIL import ImageFile
else:
ImageFile = None

for key, value in record.items():
# If the value is an image, we set it to an empty string to avoid the `README.md` to huge
if isinstance(value, ImageFile.ImageFile):
if ImageFile and isinstance(value, ImageFile.ImageFile):
value = ""
# If list is too big, the `README.md` generated will be huge so we truncate it
elif isinstance(value, list):
Expand Down
44 changes: 40 additions & 4 deletions src/distilabel/pipeline/batch_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ def __init__(
last_batch_received: Dict[str, Union[_Batch, None]],
last_batch_sent: Dict[str, Union[_Batch, None]],
last_batch_flag_sent_to: List[str],
received_batch_seq_nos: Dict[str, List[int]],
) -> None:
"""Initialize the `_BatchManager` instance.
Expand All @@ -740,12 +741,31 @@ def __init__(
`_Batch` sent to the step.
last_batch_flag_sent_to: A list with the names of the steps to which `LAST_BATCH_SENT_FLAG`
was sent.
received_batch_seq_nos: a dictionary containing the list of batches sequence
numbers received per step.
"""

self._steps = steps
self._last_batch_received = last_batch_received
self._last_batch_sent = last_batch_sent
self._last_batch_flag_sent_to = last_batch_flag_sent_to
self._received_batch_seq_nos = received_batch_seq_nos

def _missing_seq_no(self, last_batch: _Batch) -> bool:
"""Checks if there's any missing sequence number in the batches received from the
step.
Args:
last_batch: the batch with `last_batch==True` received from the step.
Returns:
`True` if there's any missing sequence number, `False` otherwise.
"""
received_batch_seq_nos = self._received_batch_seq_nos[last_batch.step_name]
for i in range(last_batch.seq_no + 1):
if i not in received_batch_seq_nos:
return True
return False

def can_generate(self) -> bool:
"""Checks if there are still batches to be processed by the steps.
Expand All @@ -759,6 +779,9 @@ def can_generate(self) -> bool:
if not batch:
return True

if batch.last_batch and self._missing_seq_no(batch):
return True

if not batch.last_batch:
return True

Expand All @@ -778,9 +801,13 @@ def register_batch(
steps_data_path: The path where the outputs of each `Step` (considering its
signature) will be saved for later reuse in another pipelines executions.
"""
last_batch = self._last_batch_received[batch.step_name]
if not last_batch or (last_batch and last_batch.seq_no < batch.seq_no):
self._last_batch_received[batch.step_name] = batch
step_name = batch.step_name
seq_no = batch.seq_no
self._received_batch_seq_nos[step_name].append(seq_no)

last_batch = self._last_batch_received[step_name]
if not last_batch or (last_batch and last_batch.seq_no < seq_no):
self._last_batch_received[step_name] = batch

if steps_data_path:
self.write_batch_data(batch, steps_data_path)
Expand Down Expand Up @@ -955,13 +982,15 @@ def from_dag( # noqa: C901
last_batch_received = {}
last_batch_sent = {}
last_batch_flag_sent_to = []
received_batch_seq_nos = {}

load_batches = {}
steps_to_load_data_from_previous_executions: Dict[str, Union[Path, None]] = {}
for step_name in dag:
step: "_Step" = dag.get_step(step_name)[STEP_ATTR_NAME]
last_batch_received[step.name] = None
last_batch_sent[step.name] = None
received_batch_seq_nos[step.name] = []
predecessors = list(dag.get_step_predecessors(step_name))
convergence_step = all(
dag.get_step(predecessor).get(RECEIVES_ROUTED_BATCHES_ATTR_NAME, False)
Expand Down Expand Up @@ -1020,7 +1049,13 @@ def from_dag( # noqa: C901
)
batch_manager_step.last_batch_received.append(predecessor)

return cls(steps, last_batch_received, last_batch_sent, last_batch_flag_sent_to)
return cls(
steps,
last_batch_received,
last_batch_sent,
last_batch_flag_sent_to,
received_batch_seq_nos,
)

def _model_dump(self, obj: Any, **kwargs: Any) -> Dict[str, Any]:
"""Dumps the content of the `_BatchManager` to a dictionary.
Expand All @@ -1043,6 +1078,7 @@ def _model_dump(self, obj: Any, **kwargs: Any) -> Dict[str, Any]:
for step_name, batch in self._last_batch_sent.items()
},
"last_batch_flag_sent_to": self._last_batch_flag_sent_to,
"received_batch_seq_nos": self._received_batch_seq_nos,
}

def cache(self, path: Path, steps_data_path: Path) -> None: # noqa: C901
Expand Down
11 changes: 6 additions & 5 deletions src/distilabel/pipeline/step_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ def run(self) -> str:
self._non_generator_process_loop()

# Just in case `None` sentinel was sent
try:
self.input_queue.get(block=False)
except Exception:
pass
# try:
# self.input_queue.get(block=False)
# except Exception:
# pass

self.step.unload()

Expand Down Expand Up @@ -218,7 +218,8 @@ def _non_generator_process_loop(self) -> None:
while True:
if (batch := self.input_queue.get()) is None:
self.step._logger.info(
f"🛑 Stopping processing batches from step '{self.step.name}'"
f"🛑 Stopping processing batches from step '{self.step.name}' (replica"
f" ID: {self.replica})"
)
break

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@


@patch("huggingface_hub.AsyncInferenceClient")
@pytest.mark.xfail
class TestInferenceEndpointsImageGeneration:
@pytest.mark.asyncio
async def test_agenerate(self, mock_inference_client: MagicMock) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def mock_hf_token_env_variable() -> Generator[None, None, None]:


@patch("huggingface_hub.AsyncInferenceClient")
@pytest.mark.xfail
class TestInferenceEndpointsLLM:
def test_no_tokenizer_magpie_raise_value_error(
self, mock_inference_client: MagicMock
Expand Down
1 change: 1 addition & 0 deletions tests/unit/pipeline/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ def test_send_last_batch_flag_to_step(self) -> None:
last_batch_received={step_name: None},
last_batch_sent={step_name: None},
last_batch_flag_sent_to=[],
received_batch_seq_nos={},
)

with mock.patch.object(pipeline, "_send_to_step") as mock_sent_to_step:
Expand Down
Loading

0 comments on commit 84ea198

Please sign in to comment.