Skip to content

Commit

Permalink
Improve readme
Browse files Browse the repository at this point in the history
  • Loading branch information
janheinrichmerker committed Nov 28, 2024
1 parent 64b7a3d commit 06b8b5b
Showing 1 changed file with 85 additions and 22 deletions.
107 changes: 85 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ For [reading](#read-documents), use [`ElasticsearchDatasource`](#read-documents)

### Read documents

You can read results from a specified index by using an `ElasticsearchDatasource` with Ray's [`read_datasource()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html#ray.data.read_datasource) like so:
You can read results from a specified index by using an `ElasticsearchDatasource` with Ray's [`read_datasource()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html#ray.data.read_datasource). Considering you have an index named `test` that stores some numeric value in the `value` field, you can efficiently compute the sum of all values like so:

```python
from ray import init
Expand All @@ -39,8 +39,7 @@ from ray_elasticsearch import ElasticsearchDatasource
init()
source = ElasticsearchDatasource(index="test")
res = read_datasource(source)\
.map(lambda x: x["_source"])\
.sum("id")
.sum("value")
print(f"Read complete. Sum: {res}")
```

Expand All @@ -58,10 +57,14 @@ source = ElasticsearchDatasource(
```

Note that the parallel read does not enforce any ordering of the results even though the results are scored by Elasticsearch.
With the default settings, you can still access the retrieved score from the Ray `Dataset`'s `_score` column.

Normally, it is not necessary to specify a fixed concurrency level.
The data source will automatically determine the optimal concurrency based on the disk size of the Elasticsearch index and the Ray cluster capabilities.
You can, however, override the concurrency by setting the `concurrency` parameter in Ray's [`read_datasource()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html#ray.data.read_datasource).
You do not need to set a fixed maximum concurrency level. But it can often be a good idea to limit concurrency (and hence, simultaneous requests to the Elasticsearch cluster) by setting the `concurrency` parameter in Ray's [`read_datasource()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html#ray.data.read_datasource):

```python
source = ElasticsearchDatasource(index="test")
ds = read_datasource(source, concurrency=100)\
```

### Write documents

Expand All @@ -74,60 +77,120 @@ from ray_elasticsearch import ElasticsearchDatasink

init()
sink = ElasticsearchDatasink(index="test")
range(10_000)\
.map(lambda x: {"_source": x})\
range(10_000) \
.rename_columns({"id": "value"}) \
.write_datasink(sink)
print("Write complete.")
```

Concurrency can again be limited by specifying the `concurrency` parameter in Ray's [`write_datasink()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.write_datasink.html#ray.data.Dataset.write_datasink).

### Elasticsearch connection
### Elasticsearch connection and authentication

Per default, the data source and sink access Elasticsearch on `localhost:9200`.
Per default, the data source and sink access Elasticsearch on `localhost:9200`, the default of the [`elasticsearch` Python library](https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/index.html).
However, in most cases, you would instead want to continue to some remote Elasticsearch instance.
To do so, specify the client like in the example below, and use the same parameters as in the [`Elasticsearch()`](https://elasticsearch-py.readthedocs.io/en/latest/api/elasticsearch.html#elasticsearch.Elasticsearch) constructor:

```python
source = ElasticsearchDatasource(
index="test",
client_kwargs=dict(
hosts="<HOST>",
http_auth=("<USERNAME>", "<PASSWORD>"),
max_retries=10,
),
hosts="<HOST>",
http_auth=("<USERNAME>", "<PASSWORD>"),
max_retries=10,
)
```

For the full list of allowed arguments in the `client_kwargs` dictionary, refer to the documentation of the [`Elasticsearch()`](https://elasticsearch-py.readthedocs.io/en/latest/api/elasticsearch.html#elasticsearch.Elasticsearch) constructor.
All client related keyword arguments to the `ElasticsearchDatasource` or `ElasticsearchDatasink` are passed on to the [`Elasticsearch()`](https://elasticsearch-py.readthedocs.io/en/latest/api/elasticsearch.html#elasticsearch.Elasticsearch) constructor. Refer to the [documentation](https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/connecting.html) for an overview of the supported connection settings.

### Elasticsearch DSL

To simplify query construction, you can also use the [Elasticsearch DSL](https://elasticsearch-dsl.readthedocs.io/en/latest/) and its corresponding data source (`ElasticsearchDslDatasource`) and sink (`ElasticsearchDslDatasink`):
To simplify query construction, you can also use the [Elasticsearch DSL](https://elasticsearch-dsl.readthedocs.io/en/latest/) like this:

```python
from elasticsearch7_dsl import Document
from elasticsearch7_dsl.query import Exists
from ray_elasticsearch import ElasticsearchDslDatasource, ElasticsearchDslDatasink
from elasticsearch_dsl import Document
from elasticsearch_dsl.query import Exists
from ray_elasticsearch import ElasticsearchDatasource, ElasticsearchDatasink

class Foo(Document):
class Index:
name = "test_foo"
text: str = Text()

source = ElasticsearchDslDatasource(
source = ElasticsearchDatasource(
index=Foo,
query=Exists(field="doi"),
)
sink = ElasticsearchDslDatasink(index=Foo)
```

Note that, unlike in [Elasticsearch DSL](https://elasticsearch-dsl.readthedocs.io/en/latest/), the results are not parsed as Python objects but instead remain Python dictionaries, due to Ray internally transforming everything in [Arrow format](https://arrow.apache.org/docs/python/index.html).
Note that, unlike in [Elasticsearch DSL](https://elasticsearch-dsl.readthedocs.io/en/latest/), the results are not parsed as Python objects but instead are returned as columns of the Ray `Dataset` (which internally uses the [Arrow format](https://arrow.apache.org/docs/python/index.html)).

### Selecting source and meta fields

Any document returned from or to-be-stored in Elasticsearch consists of the actual data nested in the `_source` field, and some metadata (e.g., `_id` and `_index`) on the top level. However, working with nested columns can sometimes be tricky with Ray (e.g., nested columns cannot be renamed easily). Because you are likely most interested in the contents of the `_source` field, i.e., the indexed fields of the Elasticsearch index, the `ray-elasticsearch` library automatically unwraps the `source` field. For example, consider the following Elasticsearch record:

```json
{
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_score" : null,
"_source" : {
"value" : 1
}
}
```

Using the default settings, the corresponding row in the Ray dataset will look like this:

```python
{
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_score" : None,
"value" : 1
}
```

You can also select the source and metadata fields explicitly, using the `source_fields` and `meta_fields` arguments:

```python
source = ElasticsearchDatasource(
index="test",
source_fields=["value"],
meta_fields=["id"],
)
```

With the above setting, just the ID and value will be stored in the Ray `Dataset`'s blocks:

```python
{
"_id" : "1",
"value" : 1
}
```

The metadata field prefix can be changed with the `meta_prefix` argument (the default is an underscore, `_`, just like with Elasticsearch).

### Examples

More examples can be found in the [`examples`](examples/) directory.

### Compatibility

This library works fine with any of the following Pip packages installed:

- `elasticsearch`
- `elasticsearch7`
- `elasticsearch8`
- `elasticsearch-dsl`
- `elasticsearch7-dsl`
- `elasticsearch8-dsl`

The `ray-elasticsearch` library will automatically detect if the Elasticsearch DSL is installed, and add support for [DSL-style queries](#elasticsearch-dsl) accordingly.

## Development

To build this package and contribute to its development you need to install the `build`, `setuptools` and `wheel` packages:
Expand Down

0 comments on commit 06b8b5b

Please sign in to comment.