Skip to content

Commit

Permalink
Merge pull request numpy#28144 from ngoldbaum/rm-update-flags
Browse files Browse the repository at this point in the history
BUG: remove unnecessary call to PyArray_UpdateFlags
  • Loading branch information
charris authored Jan 10, 2025
2 parents 6bedb61 + bb75e6e commit bbf4836
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 19 deletions.
1 change: 0 additions & 1 deletion numpy/_core/src/multiarray/iterators.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ PyArray_RawIterBaseInit(PyArrayIterObject *it, PyArrayObject *ao)
nd = PyArray_NDIM(ao);
/* The legacy iterator only supports 32 dimensions */
assert(nd <= NPY_MAXDIMS_LEGACY_ITERS);
PyArray_UpdateFlags(ao, NPY_ARRAY_C_CONTIGUOUS);
if (PyArray_ISCONTIGUOUS(ao)) {
it->contiguous = 1;
}
Expand Down
28 changes: 18 additions & 10 deletions numpy/_core/tests/test_multithreading.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def func(seed):

run_threaded(func, 500, pass_count=True)


def test_parallel_ufunc_execution():
# if the loop data cache or dispatch cache are not thread-safe
# computing ufuncs simultaneously in multiple threads leads
Expand All @@ -31,18 +32,14 @@ def func():
# see gh-26690
NUM_THREADS = 50

b = threading.Barrier(NUM_THREADS)

a = np.ones(1000)

def f():
def f(b):
b.wait()
return a.sum()

threads = [threading.Thread(target=f) for _ in range(NUM_THREADS)]
run_threaded(f, NUM_THREADS, max_workers=NUM_THREADS, pass_barrier=True)

[t.start() for t in threads]
[t.join() for t in threads]

def test_temp_elision_thread_safety():
amid = np.ones(50000)
Expand Down Expand Up @@ -121,16 +118,27 @@ def legacy_125():
task1.start()
task2.start()


def test_parallel_reduction():
# gh-28041
NUM_THREADS = 50

b = threading.Barrier(NUM_THREADS)

x = np.arange(1000)

def closure():
def closure(b):
b.wait()
np.sum(x)

run_threaded(closure, NUM_THREADS, max_workers=NUM_THREADS)
run_threaded(closure, NUM_THREADS, max_workers=NUM_THREADS,
pass_barrier=True)


def test_parallel_flat_iterator():
x = np.arange(20).reshape(5, 4).T

def closure(b):
b.wait()
for _ in range(100):
list(x.flat)

run_threaded(closure, outer_iterations=100, pass_barrier=True)
28 changes: 20 additions & 8 deletions numpy/testing/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pprint
import sysconfig
import concurrent.futures
import threading

import numpy as np
from numpy._core import (
Expand Down Expand Up @@ -2685,12 +2686,23 @@ def _get_glibc_version():
_glibc_older_than = lambda x: (_glibcver != '0.0' and _glibcver < x)


def run_threaded(func, iters, pass_count=False, max_workers=8):
def run_threaded(func, iters=8, pass_count=False, max_workers=8,
pass_barrier=False, outer_iterations=1):
"""Runs a function many times in parallel"""
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as tpe:
if pass_count:
futures = [tpe.submit(func, i) for i in range(iters)]
else:
futures = [tpe.submit(func) for _ in range(iters)]
for f in futures:
f.result()
for _ in range(outer_iterations):
with (concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
as tpe):
args = []
if pass_barrier:
if max_workers != iters:
raise RuntimeError(
"Must set max_workers equal to the number of "
"iterations to avoid deadlocks.")
barrier = threading.Barrier(max_workers)
args.append(barrier)
if pass_count:
futures = [tpe.submit(func, i, *args) for i in range(iters)]
else:
futures = [tpe.submit(func, *args) for _ in range(iters)]
for f in futures:
f.result()

0 comments on commit bbf4836

Please sign in to comment.