-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Don't reuse function-scope fixtures across parallel tests #14
Comments
This issue brings similar concerns as a previous discussion that arose a little while ago in scikit-learn scikit-learn/scikit-learn#30007 (comment) Overall, there is an unknown behaviour regarding global and thread-local fixtures represented via import pytest
import asyncio
@pytest.fixture
def thread_local_fixture():
def local_loop():
loop = asyncio.new_event_loop()
return loop
return local_loop
def test_loop(thread_local_fixture):
loop = thread_local_fixture()
... Also, as @ngoldbaum mentioned over scikit-learn/scikit-learn#30007 (comment), pytest is not thread-safe, and there are particular motivations to keep it that way. |
I don't understand your comment. Function scope fixtures are not reused by pytest, except when running with pytest-run-parallel. For example, if I write a test like: import pytest
COUNTER = 0
@pytest.fixture
def my_fixture():
global COUNTER
COUNTER += 1
return COUNTER
@pytest.mark.parametrize('execution_number', range(5))
def test_my_fixture(my_fixture, execution_number):
print(f"test_my_fixture called with {my_fixture} {execution_number}") Each invocation of Similarly, if I use If a test case wanted a fixture to be reused, it would use some other scope like "module" or "session". |
This is because we implement We would need to instead implement either I'll try to figure it out. |
Parametrized tests probably aren't very useful, right? I thought a big part of the point of the current design was to use Between the added docs at https://py-free-threading.github.io/porting/#fixing-thread-unsafe-tests and the discussion spread over a couple of issues like scikit-learn/scikit-learn#30007, it feels like the actual design here is fairly tricky and there's no clear way of making everything magically robust to thread-safety issues in the tests themselves and also exercising thread-safety issues in the functionality under test well. It'd be great to discuss and document the current design with its pros and cons in enough detail to be able to understand these subtleties. I certainly hadn't grasped the issues with fixtures and |
TL;DR I've been looking into this in the last few days and it seems like it's doable to have a more robust version of the plugin. It's hard though, and it'll require a decision on our side, whether we want to take that additional maintenance burden that'll come with a more robust solution. IMHO we should, since I expect this plugin to become increasingly popular as more projects want to test multi-threaded code. Longer version:
What I meant by using the term "parametrization" was that a single test item is broken down into more and the test report contains
There's certainly challenges and not everything will be thread-safe. Some of the thread-safety issues that'll arise will have to be handled in user testing code. However, this specific issue (and other similar issues related to fixtures, skips, etc.) have more to do with the current design of the plugin, and the fact that pytest thinks it's only one test that is running while the reality is that the test function is invoked multiple times.
Agreed.
Not sure if that'll be needed in case we do a rebust enough redesign so that some of the most common issues that we have now go away. A possible solution for how to implement this, as I said in my previous comment, would be to generate def thread_func(func, barrier, *args, **kwargs):
barrier.wait()
func(*args, **kwargs)
def wrap_func(thread):
def func(*args, **kwargs):
thread._args = (*thread._args, *args)
thread._kwargs = {**thread._kwargs, **kwargs}
thread.start()
return func
@pytest.hookimpl
def pytest_collection_modifyitems(session, config, items: list[pytest.Function]):
newitems = []
for item in items:
if n_workers == 1: # Getting this is left out for consiceness
<do nothing>
barrier = threading.Barrier(n_workers)
for i in range(n_workers):
thread = threading.Thread(target=thread_func, args=(item.obj, barrier))
# Change the test function from under pytest's nose to call thread.start, which will run the test function
# in a new thread
newitem = pytest.Function.from_parent(item, name=<name for subitem>, callobj=wrap_func(thread), ...)
newitem.stash["thread"] = thread
newitems.append(newitem)
items[:] = newitems We also need to reimplement the test loop to make sure all of the threads have terminated before doing anything else after the tests have run: @pytest.hookimpl
def pytest_runtestloop(session) -> bool:
try:
default_pytest_runtestloop(session) # Whatever this does
finally:
for item in session.items:
if item.stash["thread"] is not None:
item.stash["thread"].join()
return True This code works for more than one threads. We can probably extend it to do more than one iterations in each subitem as well. The biggest challenge is related to how to communicate exceptions/skip etc. to the main thread (it currently relies on Thoughts? |
This can be solved if the last thread id waits for an event to complete and handles everything related to every other thread completion, this would imply an exit barrier as well. This way, it is ensured that all the previous "test cases" start running without blocking and end simultaneously, however, the issue regarding skips, exceptions would be flagged from the first thread to report. def thread_func(func, barrier, is_last, evt, *args, **kwargs):
barrier.wait()
try:
func(*args, **kwargs)
except:
# Exception handling goes here, maybe store values in a shared registry
# that enters as a parameter
...
barrier.wait()
if is_last:
evt.set()
def wrap_func(thread, is_last, evt):
def func(*args, **kwargs):
thread._args = (*thread._args, *args)
thread._kwargs = {**thread._kwargs, **kwargs}
thread.start()
if is_last:
evt.wait()
# Handle information stored in registry
return func
@pytest.hookimpl
def pytest_collection_modifyitems(session, config, items: list[pytest.Function]):
newitems = []
for item in items:
if n_workers == 1: # Getting this is left out for consiceness
<do nothing>
barrier = threading.Barrier(n_workers)
evt = threading.Event()
for i in range(n_workers):
is_last = i == n_workers - 1
thread = threading.Thread(target=thread_func, args=(item.obj, barrier, is_last, evt))
# Change the test function from under pytest's nose to call thread.start, which will run the test function
# in a new thread
newitem = pytest.Function.from_parent(item, name=<name for subitem>, callobj=wrap_func(thread, is_last, evt), ...)
newitem.stash["thread"] = thread
newitems.append(newitem)
items[:] = newitems |
If I understand correctly, the default pytest behavior for fixtures is that a new fixture is created for each test case.
https://docs.pytest.org/en/6.2.x/reference.html#pytest-fixture-api
When running a test case multiple times in parallel, I don't think we should reuse fixtures across threads (at least not function scope fixtures).
For example, see
test_async.py
from the pybind11 test suite:https://github.com/pybind/pybind11/blob/f46f5be4fa4d24c4e5382d0251315f361ce97424/tests/test_async.py#L14-L18
The tests will fail when run in parallel because the event loop is shared across threads.
The text was updated successfully, but these errors were encountered: