Skip to content

Commit

Permalink
tests: rearrange test_running_cluster.py to reduce code duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
elhimov authored and dmyger committed Nov 13, 2024
1 parent 5468a74 commit 9a56eaa
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 124 deletions.
196 changes: 72 additions & 124 deletions test/integration/running/test_running_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import yaml

from utils import (control_socket, extract_status, get_tarantool_version,
run_command_and_get_output, run_path, wait_file,
wait_pid_disappear)
lib_path, log_path, run_command_and_get_output, run_path,
wait_files, wait_pid_disappear)

tarantool_major_version, tarantool_minor_version = get_tarantool_version()

Expand All @@ -26,7 +26,7 @@ def start_application(cmd, workdir, app_name, instances):
assert f"Starting an instance [{app_name}:{inst}]" in start_output


def stop_application(tt_cmd, app_name, workdir, instances):
def stop_application(tt_cmd, workdir, app_name, instances):
stop_cmd = [tt_cmd, "stop", "-y", app_name]
stop_rc, stop_out = run_command_and_get_output(stop_cmd, cwd=workdir)
assert stop_rc == 0
Expand All @@ -37,45 +37,58 @@ def stop_application(tt_cmd, app_name, workdir, instances):
assert not os.path.exists(os.path.join(workdir, run_path, app_name, inst, "tarantool.pid"))


@pytest.mark.skipif(tarantool_major_version < 3,
reason="skip cluster instances test for Tarantool < 3")
def test_running_base_functionality(tt_cmd, tmpdir_with_cfg):
tmpdir = tmpdir_with_cfg
app_name = "small_cluster_app"
def wait_cluster_started(tt_cmd, workdir, app_name, instances, inst_conf):
files = []
for inst in instances:
conf = inst_conf(workdir, app_name, inst)
files.append(conf['pid_file'])
files.append(conf['console_socket'])
assert wait_files(5, files)


def default_inst_conf(workdir, app_name, inst):
app_path = os.path.join(workdir, app_name)
return {
'pid_file': os.path.join(app_path, run_path, inst, 'tarantool.pid'),
'log_file': os.path.join(app_path, log_path, inst, 'tt.log'),
'console_socket': os.path.join(app_path, run_path, inst, control_socket),
'wal_dir': os.path.join(app_path, lib_path),
'snapshot_dir': os.path.join(app_path, lib_path),
}


def check_base_functionality(tt_cmd, tmpdir, app_name, instances, inst_conf):
app_path = os.path.join(tmpdir, app_name)
shutil.copytree(os.path.join(os.path.dirname(__file__), app_name), app_path)

run_dir = os.path.join(tmpdir, app_name, run_path)
instances = ['storage-master', 'storage-replica']
try:
# Start an instance.
start_cmd = [tt_cmd, "start", app_name]
start_application(start_cmd, tmpdir, app_name, instances)
wait_cluster_started(tt_cmd, tmpdir, app_name, instances, inst_conf)

# Check status.
pidByInstanceName = {}
for inst in instances:
file = wait_file(os.path.join(run_dir, inst), 'tarantool.pid', [])
assert file != ""
file = wait_file(os.path.join(run_dir, inst), control_socket, [])
assert file != ""
with open(os.path.join(run_dir, inst, 'tarantool.pid')) as f:
pidByInstanceName[inst] = f.readline()

status_cmd = [tt_cmd, "status", app_name]
status_rc, status_out = run_command_and_get_output(status_cmd, cwd=tmpdir)
assert status_rc == 0
status_info = extract_status(status_out)
pidByInstanceName = {}
for inst in instances:
assert status_info[app_name+":"+inst]["STATUS"] == "RUNNING"
assert os.path.exists(os.path.join(tmpdir, app_name, "var", "lib", inst))
assert os.path.exists(os.path.join(tmpdir, app_name, "var", "log", inst, "tt.log"))
assert not os.path.exists(os.path.join(tmpdir, app_name, "var", "log", inst,
"tarantool.log"))
assert status_info[f"{app_name}:{inst}"]["STATUS"] == "RUNNING"
conf = inst_conf(tmpdir, app_name, inst)
with open(conf['pid_file']) as f:
pidByInstanceName[inst] = f.readline()
assert os.path.exists(conf['wal_dir'])
assert os.path.exists(conf['snapshot_dir'])
assert os.path.exists(conf['log_file'])
if inst_conf != default_inst_conf:
assert os.path.exists(default_inst_conf(tmpdir, app_name, inst)['log_file'])
assert not os.path.exists(
os.path.join(os.path.dirname(conf['log_file']), "tarantool.log"))

# Test connection.
for inst in instances:
start_cmd = [tt_cmd, "connect", app_name + ":" + inst, "-f", "-"]
start_cmd = [tt_cmd, "connect", f"{app_name}:{inst}", "-f", "-"]
instance_process = subprocess.Popen(
start_cmd,
cwd=tmpdir,
Expand Down Expand Up @@ -103,31 +116,35 @@ def test_running_base_functionality(tt_cmd, tmpdir_with_cfg):
# Need to wait for all instances to stop before start.
for inst in instances:
# Wait when PID that was fetched on start disappears.
wait_pid_disappear(os.path.join(run_dir, inst, 'tarantool.pid'),
wait_pid_disappear(inst_conf(tmpdir, app_name, inst)['pid_file'],
pidByInstanceName.get(inst))

for inst in instances:
assert f"Starting an instance [{app_name}:{inst}]" in start_output

# Check status.
for inst in instances:
file = wait_file(os.path.join(run_dir, inst), 'tarantool.pid', [])
assert file != ""
file = wait_file(os.path.join(run_dir, inst), control_socket, [])
assert file != ""

with open(os.path.join(run_dir, inst, 'tarantool.pid')) as f:
assert pidByInstanceName[inst] != f.readline()
wait_cluster_started(tt_cmd, tmpdir, app_name, instances, inst_conf)

# Check status.
status_cmd = [tt_cmd, "status", app_name]
status_rc, status_out = run_command_and_get_output(status_cmd, cwd=tmpdir)
assert status_rc == 0
status_info = extract_status(status_out)
for inst in instances:
assert status_info[app_name+":"+inst]["STATUS"] == "RUNNING"
assert status_info[f"{app_name}:{inst}"]["STATUS"] == "RUNNING"
with open(inst_conf(tmpdir, app_name, inst)['pid_file']) as f:
assert pidByInstanceName[inst] != f.readline()

finally:
stop_application(tt_cmd, app_name, tmpdir, instances)
stop_application(tt_cmd, tmpdir, app_name, instances)


@pytest.mark.skipif(tarantool_major_version < 3,
reason="skip cluster instances test for Tarantool < 3")
def test_running_base_functionality(tt_cmd, tmpdir_with_cfg):
app_name = "small_cluster_app"
instances = ['storage-master', 'storage-replica']
inst_conf = default_inst_conf
check_base_functionality(tt_cmd, tmpdir_with_cfg, app_name, instances, inst_conf)


@pytest.mark.skipif(tarantool_major_version < 3,
Expand All @@ -152,29 +169,25 @@ def test_running_base_functionality_crud_app(tt_cmd, tmpdir_with_cfg):
assert rc == 0
assert "Application was successfully built" in instance_process.stdout.read()

run_dir = os.path.join(tmpdir, app_name, run_path)
instances = ['router', 'storage1', 'storage2']
inst_conf = default_inst_conf

try:
# Start an application.
start_cmd = [tt_cmd, "start", app_name]
start_application(start_cmd, tmpdir, app_name, ['router', 'storage1', 'storage2'])
start_application(start_cmd, tmpdir, app_name, instances)
wait_cluster_started(tt_cmd, tmpdir, app_name, instances, inst_conf)

# Check status.
for inst in ['router', 'storage1', 'storage2']:
file = wait_file(os.path.join(run_dir, inst), 'tarantool.pid', [])
assert file != ""
file = wait_file(os.path.join(run_dir, inst), control_socket, [])
assert file != ""

status_cmd = [tt_cmd, "status", app_name]
status_rc, status_out = run_command_and_get_output(status_cmd, cwd=tmpdir)
assert status_rc == 0
status_info = extract_status(status_out)
for inst in ['router', 'storage1', 'storage2']:
assert status_info["cluster_crud_app:"+inst]["STATUS"] == "RUNNING"
for inst in instances:
assert status_info[f"{app_name}:{inst}"]["STATUS"] == "RUNNING"

finally:
stop_application(tt_cmd, app_name, tmpdir, ['router', 'storage1', 'storage2'])
stop_application(tt_cmd, tmpdir, app_name, instances)


@pytest.mark.skipif(tarantool_major_version < 3,
Expand Down Expand Up @@ -244,89 +257,24 @@ def test_cluster_config_not_supported(tt_cmd, tmpdir_with_cfg):
@pytest.mark.skipif(tarantool_major_version < 3,
reason="skip cluster instances test for Tarantool < 3")
def test_cluster_cfg_changes_defaults(tt_cmd, tmpdir_with_cfg):
tmpdir = tmpdir_with_cfg
app_name = "cluster_app_changed_defaults"
app_path = os.path.join(tmpdir, app_name)
shutil.copytree(os.path.join(os.path.dirname(__file__), app_name), app_path)

try:
# Start an instance.
start_cmd = [tt_cmd, "start", app_name]
start_application(start_cmd, tmpdir, app_name, ['master'])

# Check status.
pidByInstanceName = {}
for inst in ['master']:
file = wait_file(app_path, "run_" + inst + '.pid', [])
assert file != ""
file = wait_file(app_path, "run_" + inst + ".control", [])
assert file != ""
with open(os.path.join(app_path, "run_" + inst + '.pid')) as f:
pidByInstanceName[inst] = f.readline()

status_cmd = [tt_cmd, "status", app_name]
status_rc, status_out = run_command_and_get_output(status_cmd, cwd=tmpdir)
assert status_rc == 0
status_info = extract_status(status_out)
for inst in ['master']:
assert status_info[app_name+":"+inst]["STATUS"] == "RUNNING"
assert os.path.exists(os.path.join(app_path, "var", "lib", inst + "_wal"))
assert os.path.exists(os.path.join(app_path, "var", "lib", inst + "_snapshot"))
assert os.path.exists(os.path.join(app_path, "var", "log", inst, "tt.log"))
assert os.path.exists(os.path.join(app_path, "tnt_" + inst + ".log"))

# Test connection.
for inst in ['master']:
start_cmd = [tt_cmd, "connect", app_name + ":" + inst, "-f", "-"]
instance_process = subprocess.Popen(
start_cmd,
cwd=tmpdir,
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
text=True
)
instance_process.stdin.writelines(["6*7"])
instance_process.stdin.close()
output = instance_process.stdout.read()
assert "42" in output
instances = ['master']

# Restart an application.
restart_cmd = [tt_cmd, "restart", app_name, "-y"]
instance_process = subprocess.Popen(
restart_cmd,
cwd=tmpdir,
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
text=True
)
start_output = instance_process.stdout.read()
assert f"Starting an instance [{app_name}:master]" in start_output

# Check status.
for inst in ['master']:
file = wait_file(app_path, "run_" + inst + '.pid', [])
assert file != ""
file = wait_file(app_path, "run_" + inst + ".control", [])
assert file != ""
with open(os.path.join(app_path, "run_" + inst + '.pid')) as f:
assert pidByInstanceName[inst] != f.readline()

status_cmd = [tt_cmd, "status", app_name]
status_rc, status_out = run_command_and_get_output(status_cmd, cwd=tmpdir)
assert status_rc == 0
status_info = extract_status(status_out)
for inst in ['master']:
assert status_info[app_name+":"+inst]["STATUS"] == "RUNNING"

finally:
stop_application(tt_cmd, app_name, tmpdir, ['master'])
def inst_conf(workdir, app_name, inst):
app_path = os.path.join(workdir, app_name)
return {
'pid_file': os.path.join(app_path, f'run_{inst}.pid'),
'console_socket': os.path.join(app_path, f'run_{inst}.control'),
'wal_dir': os.path.join(app_path, lib_path, f"{inst}_wal"),
'snapshot_dir': os.path.join(app_path, lib_path, f"{inst}_snapshot"),
'log_file': os.path.join(app_path, "tnt_" + inst + ".log"),
}
check_base_functionality(tt_cmd, tmpdir_with_cfg, app_name, instances, inst_conf)


@pytest.mark.skipif(tarantool_major_version < 3,
reason="skip cluster instances test for Tarantool < 3")
def test_cluster_error_cases(tt_cmd, tmpdir_with_cfg):
tmpdir = tmpdir_with_cfg
tmpdir = tmpdir_with_cfg
app_name = "cluster_app_changed_defaults"
app_path = os.path.join(tmpdir, app_name)
Expand Down
9 changes: 9 additions & 0 deletions test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ def wait_event(timeout, event_func, interval=0.1):
return False


def wait_files(timeout, files, interval=0.1):
def all_files_exist():
for file in files:
if not os.path.exists(file):
return False
return True
return wait_event(timeout, all_files_exist, interval)


class TarantoolTestInstance:
"""Create test tarantool instance via subprocess.Popen with given cfg file.
Expand Down

0 comments on commit 9a56eaa

Please sign in to comment.