Skip to content

Commit

Permalink
Generalize to support both single-cluster and multi-cluster runners
Browse files Browse the repository at this point in the history
  • Loading branch information
gbanasiak committed Jan 31, 2025
1 parent f4d22a6 commit e3f42ea
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
9 changes: 6 additions & 3 deletions elastic/shared/runners/remote_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,14 @@ async def __call__(self, multi_es, params):
continue
runner_for_op = runner_for(base_runner)
self.logger.info(f"Multi cluster wrapped runner [{base_runner}] executing on cluster [{cluster_name}].")
# call original runner assuming a single-cluster one, see https://github.com/elastic/rally/pull/488
# and https://github.com/elastic/rally/pull/1563 for additional context
# determine type of runner and call accordingly assuming the presence of MultiClientWrapper (!)
# see https://github.com/elastic/rally/pull/488 and https://github.com/elastic/rally/pull/1563
#
# don't mess with 'return' values
coroutines.append(runner_for_op({"default": cluster_client}, params))
if not hasattr(unwrap(runner_for_op), "multi_cluster"):
coroutines.append(runner_for_op({"default": cluster_client}, params))
else:
coroutines.append(runner_for_op(multi_es, params))
await asyncio.gather(*coroutines)

def __repr__(self, *args, **kwargs):
Expand Down
39 changes: 34 additions & 5 deletions elastic/tests/runners/remote_cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from unittest import mock

import pytest
from esrally.driver.runner import MultiClientRunner
from shared.runners.remote_cluster import (
ConfigureCrossClusterReplication,
ConfigureRemoteClusters,
Expand Down Expand Up @@ -468,18 +469,46 @@ def setup_params(self):

@pytest.mark.asyncio
@mock.patch("shared.runners.remote_cluster.runner_for")
async def test_wraps_correctly(self, mocked_runner_for, setup_es, setup_params):
async def test_wraps_single_cluster_runner_correctly(self, mocked_runner_for, setup_es, setup_params):
class UnitTestSingleClusterRunner:
async def __call__(self, es, params):
es_client = es["default"]
es_client.test_method(params["base-runner-param"])
es.test_method(params["base-runner-param"])
return {"weight": 1, "unit": "ops", "test": "value"}

def __str__(self):
return "UnitTestSingleClusterRunner"

base_runner = UnitTestSingleClusterRunner()
mocked_runner_for.return_value = base_runner
scr = UnitTestSingleClusterRunner()
# mimicks Rally runner internals, ugly but necessary
mocked_runner_for.return_value = MultiClientRunner(scr, str(scr), lambda es: es["default"])

mcw = MultiClusterWrapper()
r = await mcw(setup_es, setup_params)

for cluster_name, _ in setup_es.items():
# skipped clusters
if cluster_name not in ["cluster_0", "cluster_1"]:
setup_es[cluster_name].test_method.assert_has_calls([mock.call(setup_params["base-runner-param"])])

@pytest.mark.asyncio
@mock.patch("shared.runners.remote_cluster.runner_for")
async def test_wraps_multi_cluster_runner_correctly(self, mocked_runner_for, setup_es, setup_params):
# in this test multi-cluster wrapper is used together with multi-cluster runner creating a double loop
# the external loop is executed by the wrapper, while the internal by the runner
class UnitTestMultiClusterRunner:
multi_cluster = True

async def __call__(self, es, params):
for _, cluster_client in es.items():
cluster_client.test_method(params["base-runner-param"])
return {"weight": len(es), "unit": "ops", "test": "value"}

def __str__(self):
return "UnitTestMultiClusterRunner"

mcr = UnitTestMultiClusterRunner()
# mimicks Rally runner internals, ugly but necessary
mocked_runner_for.return_value = MultiClientRunner(mcr, str(mcr), lambda es: es)

mcw = MultiClusterWrapper()
r = await mcw(setup_es, setup_params)
Expand Down

0 comments on commit e3f42ea

Please sign in to comment.