-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkopf-test.py
30 lines (23 loc) · 1022 Bytes
/
kopf-test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time
import shlex
import subprocess
import pytest
from kopf.testing import KopfRunner
def test_operator():
with KopfRunner(['run', '--namespace project-operator', '--verbose', 'app/kopf_operator.py']) as runner:
# do something while the operator is running.
subprocess.run("kubectl apply -f test/project-my-namespace.yaml", shell=True, check=True)
time.sleep(30) # give it some time to react and to sleep and to retry
subprocess.run("kubectl delete -f test/project-my-namespace.yaml", shell=True, check=True)
time.sleep(30) # give it some time to react
assert runner.exit_code == 0
assert runner.exception is None
assert 'And here we are!' in runner.stdout
assert 'Deleted, really deleted' in runner.stdout
def test_absent_file_fails():
with pytest.raises(FileNotFoundError):
with KopfRunner(['run', 'non-existent.py', '--standalone']):
pass
# run test
test_operator()
test_absent_file_fails()