diff --git a/ci/tools/run-tests b/ci/tools/run-tests index 1ca54ba8207..9e00ace8647 100755 --- a/ci/tools/run-tests +++ b/ci/tools/run-tests @@ -1,6 +1,6 @@ #!/usr/bin/env bash -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # SPDX-License-Identifier: Apache-2.0 @@ -20,6 +20,14 @@ fi test_module=${1} +FREE_THREADING="" +PYTEST_PARALLEL_ARGS=() +if python -c 'import sys; assert not sys._is_gil_enabled()' 2> /dev/null; then + FREE_THREADING="-ft" + PYTEST_PARALLEL_ARGS=(--parallel-threads=4) + pip install pytest-run-parallel +fi + # For standard modes, install pathfinder up front (it is a direct dependency # of bindings, and a transitive dependency of core). Nightly modes install # all wheels together in a single pip call further below. @@ -36,7 +44,7 @@ if [[ "${test_module}" == "pathfinder" ]]; then "LD:${CUDA_PATHFINDER_TEST_LOAD_NVIDIA_DYNAMIC_LIB_STRICTNESS} " \ "FH:${CUDA_PATHFINDER_TEST_FIND_NVIDIA_HEADERS_STRICTNESS} " \ "BC:${CUDA_PATHFINDER_TEST_FIND_NVIDIA_BITCODE_LIB_STRICTNESS}" - pytest -ra -s -v --durations=0 tests/ |& tee /tmp/pathfinder_test_log.txt + pytest -ra -s -v --durations=0 "${PYTEST_PARALLEL_ARGS[@]}" tests/ |& tee /tmp/pathfinder_test_log.txt # Report the number of "INFO test_" lines (including zero) # to support quick validations based on GHA log archives. line_count=$(awk '/^INFO test_/ {count++} END {print count+0}' /tmp/pathfinder_test_log.txt) @@ -51,9 +59,9 @@ elif [[ "${test_module}" == "bindings" ]]; then pip install $(ls "${CUDA_BINDINGS_ARTIFACTS_DIR}"/*.whl)[all] --group test fi echo "Running bindings tests" - ${SANITIZER_CMD} pytest -rxXs -v --durations=0 --randomly-dont-reorganize tests/ + ${SANITIZER_CMD} pytest -rxXs -v --durations=0 --randomly-dont-reorganize "${PYTEST_PARALLEL_ARGS[@]}" tests/ if [[ "${SKIP_CYTHON_TEST}" == 0 ]]; then - ${SANITIZER_CMD} pytest -rxXs -v --durations=0 --randomly-dont-reorganize tests/cython + ${SANITIZER_CMD} pytest -rxXs -v --durations=0 --randomly-dont-reorganize "${PYTEST_PARALLEL_ARGS[@]}" tests/cython fi popd elif [[ "${test_module}" == "core" || "${test_module}" == nightly-* ]]; then @@ -61,11 +69,6 @@ elif [[ "${test_module}" == "core" || "${test_module}" == nightly-* ]]; then TEST_CUDA_MAJOR="$(cut -d '.' -f 1 <<< ${CUDA_VER})" CUDA_VER_MINOR="$(cut -d '.' -f 1-2 <<< "${CUDA_VER}")" - FREE_THREADING="" - if python -c 'import sys; assert not sys._is_gil_enabled()' 2> /dev/null; then - FREE_THREADING+="-ft" - fi - # Resolve bindings based on BINDINGS_SOURCE (set by env-vars): # main/backport → local wheel from artifacts dir # published → install from PyPI by version @@ -106,11 +109,11 @@ elif [[ "${test_module}" == "core" || "${test_module}" == nightly-* ]]; then echo "Installed packages before core tests:" pip list echo "Running core tests" - ${SANITIZER_CMD} pytest -rxXs -v --durations=0 --randomly-dont-reorganize tests/ + ${SANITIZER_CMD} pytest -rxXs -v --durations=0 --randomly-dont-reorganize "${PYTEST_PARALLEL_ARGS[@]}" tests/ # Currently our CI always installs the latest bindings (from either major version). # This is not compatible with the test requirements. if [[ "${SKIP_CYTHON_TEST}" == 0 ]]; then - ${SANITIZER_CMD} pytest -rxXs -v --durations=0 --randomly-dont-reorganize tests/cython + ${SANITIZER_CMD} pytest -rxXs -v --durations=0 --randomly-dont-reorganize "${PYTEST_PARALLEL_ARGS[@]}" tests/cython fi else # Nightly optional-dependency testing. diff --git a/cuda_bindings/tests/conftest.py b/cuda_bindings/tests/conftest.py index f30500c1342..f63beca6584 100644 --- a/cuda_bindings/tests/conftest.py +++ b/cuda_bindings/tests/conftest.py @@ -1,8 +1,11 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE +import functools +import inspect import pathlib import sys +from contextlib import contextmanager from importlib.metadata import PackageNotFoundError, distribution import pytest @@ -25,6 +28,84 @@ sys.path.insert(0, test_helpers_root) +def _parallel_threads_enabled(config): + parallel_threads = getattr(config.option, "parallel_threads", 0) + if parallel_threads == "auto": + return True + return parallel_threads is not None and int(parallel_threads) > 0 + + +def pytest_configure(config): + if _parallel_threads_enabled(config): + config.pluginmanager.register(_CudaBindingsParallelPlugin(), name="_cuda_bindings_parallel_plugin") + + +@contextmanager +def _thread_context(): + # Defensive: if this worker thread already has an active context (e.g. from + # double-wrapping), reuse it rather than pushing another one. + # Note: fixtures never run on the test thread; this is purely a safety net. + err, existing = cuda.cuCtxGetCurrent() + if err == cuda.CUresult.CUDA_SUCCESS and existing and int(existing) != 0: + yield None, existing + return + + # cuInit(0) is idempotent; safe to call even if cuda_driver fixture already ran. + (err,) = cuda.cuInit(0) + assert err == cuda.CUresult.CUDA_SUCCESS + err, device = cuda.cuDeviceGet(0) + assert err == cuda.CUresult.CUDA_SUCCESS + err, ctx = cuda.cuCtxCreate(None, 0, device) + assert err == cuda.CUresult.CUDA_SUCCESS + try: + yield device, ctx + finally: + (err,) = cuda.cuCtxDestroy(ctx) + assert err == cuda.CUresult.CUDA_SUCCESS + + +def _wrap_worker_cuda_test(func): + if getattr(func, "_cuda_bindings_worker_cuda_wrapped", False): + return func + + sig = inspect.signature(func) + wants_device = "device" in sig.parameters + wants_ctx = "ctx" in sig.parameters + + @functools.wraps(func) + def wrapper(*args, **kwargs): + with _thread_context() as (device, ctx): + # device is None when reusing an existing context (defensive path); + # keep whatever the fixture provided in kwargs as-is. + if wants_device and device is not None: + kwargs["device"] = device + if wants_ctx: + kwargs["ctx"] = ctx + return func(*args, **kwargs) + + wrapper._cuda_bindings_worker_cuda_wrapped = True + return wrapper + + +def _item_needs_thread_ctx(item): + fixturenames = getattr(item, "fixturenames", ()) + # 'device' is present when the module-level ctx(device) autouse chain is + # active (test_cuda.py, test_kernelParams.py, nvml tests, …). + # 'driver' is present for test_cufile.py tests that use the local driver + # fixture; their local ctx() shadows the parent ctx(device) so 'device' + # does not appear in their fixture chain, but they still need a per-thread + # CUDA context for cuMemAlloc and similar calls made inside the test. + return "device" in fixturenames or "driver" in fixturenames + + +class _CudaBindingsParallelPlugin: + @pytest.hookimpl(tryfirst=True) + def pytest_collection_modifyitems(self, config, items): + for item in items: + if _item_needs_thread_ctx(item): + item.obj = _wrap_worker_cuda_test(item.obj) + + @pytest.fixture(scope="module") def cuda_driver(): (err,) = cuda.cuInit(0) diff --git a/cuda_bindings/tests/nvml/test_init.py b/cuda_bindings/tests/nvml/test_init.py index 4c94dc26a3e..19e573c9cc6 100644 --- a/cuda_bindings/tests/nvml/test_init.py +++ b/cuda_bindings/tests/nvml/test_init.py @@ -42,6 +42,7 @@ def get_architecture_name(arch): @pytest.mark.skipif(sys.platform == "win32", reason="Test not supported on Windows") +@pytest.mark.thread_unsafe(reason="nvml init affects other threads") def test_init_ref_count(): """ Verifies that we can call NVML shutdown and init(2) multiple times, and that ref counting works diff --git a/cuda_bindings/tests/test_cuda.py b/cuda_bindings/tests/test_cuda.py index 32b05f638fe..4d11d08b287 100644 --- a/cuda_bindings/tests/test_cuda.py +++ b/cuda_bindings/tests/test_cuda.py @@ -456,6 +456,7 @@ def test_cuda_mem_range_attr(device): @pytest.mark.skipif(driverVersionLessThan(11040) or not supportsMemoryPool(), reason="Mempool for graphs not supported") +@pytest.mark.thread_unsafe(reason="used high memory can be higher if threaded.") def test_cuda_graphMem_attr(device): err, stream = cuda.cuStreamCreate(0) assert err == cuda.CUresult.CUDA_SUCCESS diff --git a/cuda_bindings/tests/test_cufile.py b/cuda_bindings/tests/test_cufile.py index 6e614ca1b05..1c92acd3c32 100644 --- a/cuda_bindings/tests/test_cufile.py +++ b/cuda_bindings/tests/test_cufile.py @@ -1,8 +1,7 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE import ctypes -import errno import logging import os import pathlib @@ -118,12 +117,6 @@ def get_tegra_kind(): ), ] -xfail_handle_register = pytest.mark.xfail( - condition=isSupportedFilesystem() and os.environ.get("CI") is not None, - raises=cufile.cuFileError, - reason="handle_register call fails in CI for unknown reasons", -) - def test_cufile_success_defined(): """Check if CUFILE_SUCCESS is defined in OpError enum.""" @@ -145,7 +138,7 @@ def ctx(): (err,) = cuda.cuCtxSetCurrent(ctx) assert err == cuda.CUresult.CUDA_SUCCESS - yield + yield ctx cuda.cuDevicePrimaryCtxRelease(device) @@ -204,11 +197,10 @@ def driver(ctx): @pytest.mark.skipif(not isSupportedFilesystem(), reason="cuFile handle_register requires ext4 or xfs filesystem") @pytest.mark.usefixtures("driver") -@xfail_handle_register -def test_handle_register(): +def test_handle_register(tmpdir): """Test file handle registration with cuFile.""" # Create test file - file_path = "test_handle_register.bin" + file_path = tmpdir / "test_handle_register.bin" # Create file with POSIX operations fd = os.open(file_path, os.O_CREAT | os.O_RDWR, 0o600) @@ -242,8 +234,6 @@ def test_handle_register(): finally: os.close(fd) - with suppress(OSError): - os.unlink(file_path) @pytest.mark.usefixtures("driver") @@ -397,11 +387,10 @@ def test_buf_register_already_registered(): @pytest.mark.skipif(not isSupportedFilesystem(), reason="cuFile handle_register requires ext4 or xfs filesystem") @pytest.mark.usefixtures("driver") -@xfail_handle_register -def test_cufile_read_write(): +def test_cufile_read_write(tmpdir): """Test cuFile read and write operations.""" # Create test file - file_path = "test_cufile_rw.bin" + file_path = tmpdir / "test_cufile_rw.bin" # Allocate CUDA memory for write and read write_size = 65536 # 64KB, aligned to 4096 bytes (65536 % 4096 == 0) @@ -478,21 +467,14 @@ def test_cufile_read_write(): # Free CUDA memory cuda.cuMemFree(write_buf) cuda.cuMemFree(read_buf) - # Clean up test file - try: - os.unlink(file_path) - except OSError as e: - if e.errno != errno.ENOENT: - raise @pytest.mark.skipif(not isSupportedFilesystem(), reason="cuFile handle_register requires ext4 or xfs filesystem") @pytest.mark.usefixtures("driver") -@xfail_handle_register -def test_cufile_read_write_host_memory(): +def test_cufile_read_write_host_memory(tmpdir): """Test cuFile read and write operations using host memory.""" # Create test file - file_path = "test_cufile_rw_host.bin" + file_path = tmpdir / "test_cufile_rw_host.bin" # Allocate host memory for write and read write_size = 65536 # 64KB, aligned to 4096 bytes (65536 % 4096 == 0) @@ -565,21 +547,14 @@ def test_cufile_read_write_host_memory(): # Free host memory cuda.cuMemFreeHost(write_buf) cuda.cuMemFreeHost(read_buf) - # Clean up test file - try: - os.unlink(file_path) - except OSError as e: - if e.errno != errno.ENOENT: - raise @pytest.mark.skipif(not isSupportedFilesystem(), reason="cuFile handle_register requires ext4 or xfs filesystem") @pytest.mark.usefixtures("driver") -@xfail_handle_register -def test_cufile_read_write_large(): +def test_cufile_read_write_large(tmpdir): """Test cuFile read and write operations with large data.""" # Create test file - file_path = "test_cufile_rw_large.bin" + file_path = tmpdir / "test_cufile_rw_large.bin" # Allocate large CUDA memory (1MB, aligned to 4096 bytes) write_size = 1024 * 1024 # 1MB, aligned to 4096 bytes (1048576 % 4096 == 0) @@ -659,21 +634,14 @@ def test_cufile_read_write_large(): # Free CUDA memory cuda.cuMemFree(write_buf) cuda.cuMemFree(read_buf) - # Clean up test file - try: - os.unlink(file_path) - except OSError as e: - if e.errno != errno.ENOENT: - raise @pytest.mark.skipif(not isSupportedFilesystem(), reason="cuFile handle_register requires ext4 or xfs filesystem") @pytest.mark.usefixtures("ctx", "cufile_env_json", "driver") -@xfail_handle_register -def test_cufile_write_async(): +def test_cufile_write_async(tmpdir): """Test cuFile asynchronous write operations.""" # Create test file - file_path = "test_cufile_write_async.bin" + file_path = tmpdir / "test_cufile_write_async.bin" fd = os.open(file_path, os.O_CREAT | os.O_RDWR | os.O_DIRECT, 0o600) try: @@ -741,17 +709,14 @@ def test_cufile_write_async(): finally: os.close(fd) - with suppress(OSError): - os.unlink(file_path) @pytest.mark.skipif(not isSupportedFilesystem(), reason="cuFile handle_register requires ext4 or xfs filesystem") @pytest.mark.usefixtures("ctx", "cufile_env_json", "driver") -@xfail_handle_register -def test_cufile_read_async(): +def test_cufile_read_async(tmpdir): """Test cuFile asynchronous read operations.""" # Create test file - file_path = "test_cufile_read_async.bin" + file_path = tmpdir / "test_cufile_read_async.bin" # First create and write test data without O_DIRECT fd_temp = os.open(file_path, os.O_CREAT | os.O_RDWR, 0o600) @@ -832,17 +797,14 @@ def test_cufile_read_async(): finally: os.close(fd) - with suppress(OSError): - os.unlink(file_path) @pytest.mark.skipif(not isSupportedFilesystem(), reason="cuFile handle_register requires ext4 or xfs filesystem") -@xfail_handle_register @pytest.mark.usefixtures("ctx", "cufile_env_json", "driver") -def test_cufile_async_read_write(): +def test_cufile_async_read_write(tmpdir): """Test cuFile asynchronous read and write operations in sequence.""" # Create test file - file_path = "test_cufile_async_rw.bin" + file_path = tmpdir / "test_cufile_async_rw.bin" fd = os.open(file_path, os.O_CREAT | os.O_RDWR | os.O_DIRECT, 0o600) try: @@ -946,17 +908,14 @@ def test_cufile_async_read_write(): finally: os.close(fd) - with suppress(OSError): - os.unlink(file_path) @pytest.mark.skipif(not isSupportedFilesystem(), reason="cuFile handle_register requires ext4 or xfs filesystem") @pytest.mark.usefixtures("driver") -@xfail_handle_register -def test_batch_io_basic(): +def test_batch_io_basic(tmpdir): """Test basic batch IO operations with multiple read/write operations.""" # Create test file - file_path = "test_batch_io.bin" + file_path = tmpdir / "test_batch_io.bin" # Allocate CUDA memory for multiple operations buf_size = 65536 # 64KB @@ -1145,21 +1104,14 @@ def test_batch_io_basic(): # Free CUDA memory for buf in buffers + read_buffers: cuda.cuMemFree(buf) - # Clean up test file - try: - os.unlink(file_path) - except OSError as e: - if e.errno != errno.ENOENT: - raise @pytest.mark.skipif(not isSupportedFilesystem(), reason="cuFile handle_register requires ext4 or xfs filesystem") @pytest.mark.usefixtures("driver") -@xfail_handle_register -def test_batch_io_cancel(): +def test_batch_io_cancel(tmpdir): """Test batch IO cancellation.""" # Create test file - file_path = "test_batch_cancel.bin" + file_path = tmpdir / "test_batch_cancel.bin" # Allocate CUDA memory buf_size = 4096 # 4KB, aligned to 4096 bytes @@ -1229,21 +1181,14 @@ def test_batch_io_cancel(): # Free CUDA memory for buf in buffers: cuda.cuMemFree(buf) - # Clean up test file - try: - os.unlink(file_path) - except OSError as e: - if e.errno != errno.ENOENT: - raise @pytest.mark.skipif(not isSupportedFilesystem(), reason="cuFile handle_register requires ext4 or xfs filesystem") @pytest.mark.usefixtures("driver") -@xfail_handle_register -def test_batch_io_large_operations(): +def test_batch_io_large_operations(tmpdir): """Test batch IO with large buffer operations.""" # Create test file - file_path = "test_batch_large.bin" + file_path = tmpdir / "test_batch_large.bin" # Allocate large CUDA memory (1MB, aligned to 4096 bytes) buf_size = 1024 * 1024 # 1MB, aligned to 4096 bytes @@ -1421,12 +1366,6 @@ def test_batch_io_large_operations(): # Free CUDA memory for buf in all_buffers: cuda.cuMemFree(buf) - # Clean up test file - try: - os.unlink(file_path) - except OSError as e: - if e.errno != errno.ENOENT: - raise @pytest.mark.skipif( @@ -1647,11 +1586,11 @@ def test_stats_start_stop(): ) @pytest.mark.skipif(not isSupportedFilesystem(), reason="cuFile handle_register requires ext4 or xfs filesystem") @pytest.mark.usefixtures("stats") -@xfail_handle_register -def test_get_stats_l1(): +@pytest.mark.thread_unsafe(reason="cuFile stats counters and collection state are process-global") +def test_get_stats_l1(tmpdir): """Test cuFile L1 statistics retrieval with file operations.""" # Create test file directly with O_DIRECT - file_path = "test_stats_l1.bin" + file_path = tmpdir / "test_stats_l1.bin" fd = os.open(file_path, os.O_CREAT | os.O_RDWR | os.O_DIRECT, 0o600) try: @@ -1718,8 +1657,6 @@ def test_get_stats_l1(): finally: os.close(fd) - with suppress(OSError): - os.unlink(file_path) @pytest.mark.skipif( @@ -1727,11 +1664,11 @@ def test_get_stats_l1(): ) @pytest.mark.skipif(not isSupportedFilesystem(), reason="cuFile handle_register requires ext4 or xfs filesystem") @pytest.mark.usefixtures("stats") -@xfail_handle_register -def test_get_stats_l2(): +@pytest.mark.thread_unsafe(reason="cuFile stats counters and collection state are process-global") +def test_get_stats_l2(tmpdir): """Test cuFile L2 statistics retrieval with file operations.""" # Create test file directly with O_DIRECT - file_path = "test_stats_l2.bin" + file_path = tmpdir / "test_stats_l2.bin" fd = os.open(file_path, os.O_CREAT | os.O_RDWR | os.O_DIRECT, 0o600) try: @@ -1802,8 +1739,6 @@ def test_get_stats_l2(): finally: os.close(fd) - with suppress(OSError): - os.unlink(file_path) @pytest.mark.skipif( @@ -1811,11 +1746,11 @@ def test_get_stats_l2(): ) @pytest.mark.skipif(not isSupportedFilesystem(), reason="cuFile handle_register requires ext4 or xfs filesystem") @pytest.mark.usefixtures("stats") -@xfail_handle_register -def test_get_stats_l3(): +@pytest.mark.thread_unsafe(reason="cuFile stats counters and collection state are process-global") +def test_get_stats_l3(tmpdir): """Test cuFile L3 statistics retrieval with file operations.""" # Create test file directly with O_DIRECT - file_path = "test_stats_l3.bin" + file_path = tmpdir / "test_stats_l3.bin" fd = os.open(file_path, os.O_CREAT | os.O_RDWR | os.O_DIRECT, 0o600) try: @@ -1896,8 +1831,6 @@ def test_get_stats_l3(): finally: os.close(fd) - with suppress(OSError): - os.unlink(file_path) @pytest.mark.skipif( diff --git a/cuda_bindings/tests/test_examples.py b/cuda_bindings/tests/test_examples.py index 0c3efe72811..bd7b51434fd 100644 --- a/cuda_bindings/tests/test_examples.py +++ b/cuda_bindings/tests/test_examples.py @@ -20,6 +20,7 @@ def test_example(example): env = os.environ.copy() env["CUDA_BINDINGS_SKIP_EXAMPLE"] = "100" + env["MPLBACKEND"] = "Agg" # avoid plt.show() from blocking process = subprocess.run([sys.executable, example], capture_output=True, env=env) # noqa: S603 # returncode is a special value used in the examples to indicate that system requirements are not met. diff --git a/cuda_core/cuda/core/_device.pyx b/cuda_core/cuda/core/_device.pyx index da6972f3727..451ca25ddaa 100644 --- a/cuda_core/cuda/core/_device.pyx +++ b/cuda_core/cuda/core/_device.pyx @@ -85,9 +85,12 @@ cdef class DeviceProperties: cdef inline int _get_cached_attribute(self, attr, default=0) except? -2: """Retrieve the attribute value, using cache if applicable.""" - if attr not in self._cache: - self._cache[attr] = self._get_attribute(attr, default) - return self._cache[attr] + cached = self._cache.get(attr) + if cached is not None: + return cached + cdef int value = self._get_attribute(attr, default) + self._cache[attr] = value # setdefault not needed for ints + return value @property def max_threads_per_block(self) -> int: @@ -1131,11 +1134,11 @@ class Device: def compute_capability(self) -> ComputeCapability: """Return a named tuple with 2 fields: major and minor.""" cdef DeviceProperties prop = self.properties - if "compute_capability" in prop._cache: - return prop._cache["compute_capability"] + cached = prop._cache.get("compute_capability") + if cached is not None: + return cached cc = ComputeCapability(prop.compute_capability_major, prop.compute_capability_minor) - prop._cache["compute_capability"] = cc - return cc + return prop._cache.setdefault("compute_capability", cc) @property def arch(self) -> str: diff --git a/cuda_core/cuda/core/_device_resources.pxd b/cuda_core/cuda/core/_device_resources.pxd index d618c24cf10..98f91ab4733 100644 --- a/cuda_core/cuda/core/_device_resources.pxd +++ b/cuda_core/cuda/core/_device_resources.pxd @@ -2,6 +2,8 @@ # # SPDX-License-Identifier: Apache-2.0 +cimport cython + from cuda.bindings cimport cydriver from cuda.core._resource_handles cimport ContextHandle, GreenCtxHandle @@ -15,6 +17,7 @@ cdef class SMResource: unsigned int _flags bint _is_usable object __weakref__ + cython.pymutex _split_mutex @staticmethod cdef SMResource _from_dev_resource(cydriver.CUdevResource res, int device_id) diff --git a/cuda_core/cuda/core/_device_resources.pyx b/cuda_core/cuda/core/_device_resources.pyx index ecd9e00bf05..bafc462c936 100644 --- a/cuda_core/cuda/core/_device_resources.pyx +++ b/cuda_core/cuda/core/_device_resources.pyx @@ -498,10 +498,11 @@ cdef class SMResource: ) _resolve_group_count(opts) _check_green_ctx_support() - if _can_use_structured_sm_split(): - return _split_with_general_api(self, opts, dry_run) - # SplitByCount requires the same 12.4+ as green ctx support (already checked above) - return _split_with_count_api(self, opts, dry_run) + with self._split_mutex: + if _can_use_structured_sm_split(): + return _split_with_general_api(self, opts, dry_run) + # SplitByCount requires the same 12.4+ as green ctx support (already checked above) + return _split_with_count_api(self, opts, dry_run) cdef class WorkqueueResource: diff --git a/cuda_core/cuda/core/_memory/_buffer.pxd b/cuda_core/cuda/core/_memory/_buffer.pxd index 98c4b50db31..83dcd4f68c2 100644 --- a/cuda_core/cuda/core/_memory/_buffer.pxd +++ b/cuda_core/cuda/core/_memory/_buffer.pxd @@ -3,6 +3,8 @@ # SPDX-License-Identifier: Apache-2.0 from libc.stdint cimport uintptr_t +from libcpp cimport bool as cpp_bool +from libcpp.atomic cimport atomic as std_atomic, memory_order_acquire, memory_order_release from cuda.bindings cimport cydriver from cuda.core._resource_handles cimport DevicePtrHandle @@ -18,13 +20,13 @@ cdef struct _MemAttrs: cdef class Buffer: cdef: - DevicePtrHandle _h_ptr - MemoryResource _memory_resource - object _ipc_data - object _owner - _MemAttrs _mem_attrs - bint _mem_attrs_inited - object __weakref__ + DevicePtrHandle _h_ptr + MemoryResource _memory_resource + object _ipc_data + object _owner + _MemAttrs _mem_attrs + std_atomic[cpp_bool] _mem_attrs_inited + object __weakref__ cdef public: # Python code in _memory/_virtual_memory_resource.py needs to update # this value, though it is technically private. diff --git a/cuda_core/cuda/core/_memory/_buffer.pyi b/cuda_core/cuda/core/_memory/_buffer.pyi index 728853c4bc7..7118a3a1e07 100644 --- a/cuda_core/cuda/core/_memory/_buffer.pyi +++ b/cuda_core/cuda/core/_memory/_buffer.pyi @@ -2,6 +2,7 @@ from __future__ import annotations +import cython from cuda.core._memory._device_memory_resource import DeviceMemoryResource from cuda.core._memory._ipc import IPCBufferDescriptor from cuda.core._memory._pinned_memory_resource import PinnedMemoryResource @@ -88,6 +89,7 @@ class Buffer: """ @property + @cython.critical_section def ipc_descriptor(self) -> IPCBufferDescriptor: """Descriptor for sharing this buffer with other processes.""" diff --git a/cuda_core/cuda/core/_memory/_buffer.pyx b/cuda_core/cuda/core/_memory/_buffer.pyx index 88f9054385a..00359c1f0bf 100644 --- a/cuda_core/cuda/core/_memory/_buffer.pyx +++ b/cuda_core/cuda/core/_memory/_buffer.pyx @@ -96,7 +96,7 @@ cdef class Buffer: self._memory_resource = None self._ipc_data = None self._owner = None - self._mem_attrs_inited = False + self._mem_attrs_inited.store(False) def __init__(self, *args, **kwargs) -> None: raise RuntimeError("Buffer objects cannot be instantiated directly. " @@ -126,7 +126,7 @@ cdef class Buffer: self._memory_resource = mr self._ipc_data = IPCDataForBuffer(ipc_descriptor, True) if ipc_descriptor is not None else None self._owner = owner - self._mem_attrs_inited = False + self._mem_attrs_inited.store(False) return self @staticmethod @@ -191,6 +191,7 @@ cdef class Buffer: return _ipc.Buffer_from_ipc_descriptor(cls, mr, ipc_descriptor, stream) @property + @cython.critical_section def ipc_descriptor(self) -> IPCBufferDescriptor: """Descriptor for sharing this buffer with other processes.""" if self._ipc_data is None: @@ -445,11 +446,12 @@ cdef class Buffer: # Memory Attribute Query Helpers # ------------------------------ +@cython.critical_section cdef inline void _init_mem_attrs(Buffer self): """Initialize memory attributes by querying the pointer.""" - if not self._mem_attrs_inited: + if not self._mem_attrs_inited.load(memory_order_acquire): _query_memory_attrs(self._mem_attrs, as_cu(self._h_ptr)) - self._mem_attrs_inited = True + self._mem_attrs_inited.store(True, memory_order_release) cdef inline int _query_memory_attrs( @@ -597,7 +599,7 @@ cdef Buffer Buffer_from_deviceptr_handle( buf._memory_resource = mr buf._ipc_data = IPCDataForBuffer(ipc_descriptor, True) if ipc_descriptor is not None else None buf._owner = None - buf._mem_attrs_inited = False + buf._mem_attrs_inited.store(False) return buf diff --git a/cuda_core/cuda/core/_memory/_graph_memory_resource.pyi b/cuda_core/cuda/core/_memory/_graph_memory_resource.pyi index 4ff85eb5972..b34f968fdc9 100644 --- a/cuda_core/cuda/core/_memory/_graph_memory_resource.pyi +++ b/cuda_core/cuda/core/_memory/_graph_memory_resource.pyi @@ -2,8 +2,6 @@ from __future__ import annotations -from functools import cache - from cuda.core._device import Device from cuda.core._memory._buffer import Buffer, MemoryResource from cuda.core._stream import Stream @@ -113,7 +111,6 @@ class GraphMemoryResource(cyGraphMemoryResource): ... @classmethod - @cache def _create(cls, device_id: int) -> GraphMemoryResource: ... __all__ = ['GraphMemoryResource'] \ No newline at end of file diff --git a/cuda_core/cuda/core/_memory/_graph_memory_resource.pyx b/cuda_core/cuda/core/_memory/_graph_memory_resource.pyx index 479322ab017..e845a47b080 100644 --- a/cuda_core/cuda/core/_memory/_graph_memory_resource.pyx +++ b/cuda_core/cuda/core/_memory/_graph_memory_resource.pyx @@ -18,7 +18,6 @@ from cuda.core._resource_handles cimport ( from cuda.core._stream cimport Stream_accept, Stream from cuda.core._utils.cuda_utils cimport HANDLE_RETURN -from functools import cache from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -161,6 +160,8 @@ cdef class cyGraphMemoryResource(MemoryResource): return False +cdef dict _mem_resource_cache = {} + class GraphMemoryResource(cyGraphMemoryResource): """ A memory resource for memory related to graphs. @@ -185,9 +186,16 @@ class GraphMemoryResource(cyGraphMemoryResource): return cls._create(c_device_id) @classmethod - @cache def _create(cls, int device_id) -> GraphMemoryResource: - return cyGraphMemoryResource.__new__(cls, device_id) + # we use a dict currently, because functools.cache is currently less + # thread-safe see also: https://github.com/python/cpython/issues/150708 + res = _mem_resource_cache.get(device_id) + if res is not None: + return res + + # create new instance, but in case of a race may return another: + new = cyGraphMemoryResource.__new__(cls, device_id) + return _mem_resource_cache.setdefault(device_id, new) # Raise an exception if the given stream is capturing. diff --git a/cuda_core/cuda/core/_memory/_memory_pool.pyi b/cuda_core/cuda/core/_memory/_memory_pool.pyi index 3d15d9f679f..7f8c64aedda 100644 --- a/cuda_core/cuda/core/_memory/_memory_pool.pyi +++ b/cuda_core/cuda/core/_memory/_memory_pool.pyi @@ -4,6 +4,7 @@ from __future__ import annotations import uuid +import cython from cuda.core._memory._buffer import Buffer, MemoryResource from cuda.core._stream import Stream from cuda.core.graph import GraphBuilder @@ -97,6 +98,7 @@ class _MemPool(MemoryResource): """ @property + @cython.critical_section def attributes(self) -> _MemPoolAttributes: """Memory pool attributes.""" diff --git a/cuda_core/cuda/core/_memory/_memory_pool.pyx b/cuda_core/cuda/core/_memory/_memory_pool.pyx index c6276f0f3de..ddcac2d6063 100644 --- a/cuda_core/cuda/core/_memory/_memory_pool.pyx +++ b/cuda_core/cuda/core/_memory/_memory_pool.pyx @@ -4,6 +4,7 @@ from __future__ import annotations +cimport cython from libc.limits cimport ULLONG_MAX from libc.stdint cimport uintptr_t from libc.string cimport memset @@ -177,6 +178,7 @@ cdef class _MemPool(MemoryResource): _MP_deallocate(self, ptr, size, s) @property + @cython.critical_section def attributes(self) -> _MemPoolAttributes: """Memory pool attributes.""" if self._attributes is None: diff --git a/cuda_core/cuda/core/_memoryview.pyx b/cuda_core/cuda/core/_memoryview.pyx index c65107ae273..260980c1daf 100644 --- a/cuda_core/cuda/core/_memoryview.pyx +++ b/cuda_core/cuda/core/_memoryview.pyx @@ -4,6 +4,7 @@ from __future__ import annotations +cimport cython from ._dlpack cimport * from ._dlpack import classify_dl_device from libc.stdint cimport intptr_t @@ -80,7 +81,7 @@ cdef inline bint _is_torch_tensor(object obj): cdef str mod = tp.__module__ or "" cdef bint result = mod.startswith("torch") and hasattr(obj, "data_ptr") \ and _torch_version_check() - _torch_type_cache[tp] = result + _torch_type_cache[tp] = result # setdefault not needed for bools return result @@ -539,6 +540,7 @@ cdef class StridedMemoryView: + f" readonly={self.readonly},\n" + f" exporting_obj={get_simple_repr(self.exporting_obj)})") + @cython.critical_section cdef inline _StridedLayout get_layout(self): if self._layout is None: if self.dl_tensor: @@ -549,6 +551,7 @@ cdef class StridedMemoryView: raise ValueError("Cannot infer layout from the exporting object") return self._layout + @cython.critical_section cdef inline object get_buffer(self): """ Returns Buffer instance with the underlying data. @@ -562,6 +565,7 @@ cdef class StridedMemoryView: self._buffer = Buffer.from_handle(self.ptr, 0, owner=self.exporting_obj) return self._buffer + @cython.critical_section cdef inline object get_dtype(self): if self._dtype is None: if self.dl_tensor != NULL: diff --git a/cuda_core/cuda/core/_module.pyi b/cuda_core/cuda/core/_module.pyi index caf6b09b717..5125b99131a 100644 --- a/cuda_core/cuda/core/_module.pyi +++ b/cuda_core/cuda/core/_module.pyi @@ -5,6 +5,7 @@ from __future__ import annotations from collections import namedtuple from os import PathLike +import cython from cuda.core._device import Device from cuda.core._launch_config import LaunchConfig from cuda.core._stream import Stream @@ -253,6 +254,7 @@ class Kernel: ... @property + @cython.critical_section def attributes(self) -> KernelAttributes: """Get the read-only attributes of this kernel.""" @@ -265,6 +267,7 @@ class Kernel: """list[ParamInfo]: (offset, size) for each argument of this function""" @property + @cython.critical_section def occupancy(self) -> KernelOccupancy: """Get the occupancy information for launching this kernel.""" diff --git a/cuda_core/cuda/core/_module.pyx b/cuda_core/cuda/core/_module.pyx index 5cb1b7f0059..91c8ad43895 100644 --- a/cuda_core/cuda/core/_module.pyx +++ b/cuda_core/cuda/core/_module.pyx @@ -4,6 +4,7 @@ from __future__ import annotations +cimport cython from libc.stddef cimport size_t from collections import namedtuple @@ -83,7 +84,7 @@ cdef class KernelAttributes: cdef int result with nogil: HANDLE_RETURN(cydriver.cuKernelGetAttribute(&result, attribute, as_cu(self._h_kernel), device_id)) - self._cache[cache_key] = result + self._cache[cache_key] = result # setdefault not needed for ints return result def __getitem__(self, device: Device | int) -> KernelAttributes: @@ -454,6 +455,7 @@ cdef class Kernel: return ker @property + @cython.critical_section def attributes(self) -> KernelAttributes: """Get the read-only attributes of this kernel.""" if self._attributes is None: @@ -501,6 +503,7 @@ cdef class Kernel: return param_info @property + @cython.critical_section def occupancy(self) -> KernelOccupancy: """Get the occupancy information for launching this kernel.""" if self._occupancy is None: @@ -742,6 +745,7 @@ cdef class ObjectCode: # TODO: do we want to unload in a finalizer? Probably not.. + @cython.critical_section cdef int _lazy_load_module(self) except -1: if self._h_library: return 0 diff --git a/cuda_core/cuda/core/graph/_graph_node.pyx b/cuda_core/cuda/core/graph/_graph_node.pyx index f627edf9bb2..d3d684aff3e 100644 --- a/cuda_core/cuda/core/graph/_graph_node.pyx +++ b/cuda_core/cuda/core/graph/_graph_node.pyx @@ -78,8 +78,7 @@ _node_registry: weakref.WeakValueDictionary[int, GraphNode] = weakref.WeakValueD cdef inline GraphNode _registered(GraphNode n): - _node_registry[n._h_node.get()] = n - return n + return _node_registry.setdefault(n._h_node.get(), n) cdef class GraphNode: @@ -162,10 +161,11 @@ cdef class GraphNode: cdef cydriver.CUgraphNode node = as_cu(self._h_node) if node == NULL: return - with nogil: - HANDLE_RETURN(cydriver.cuGraphDestroyNode(node)) + _node_registry.pop(self._h_node.get(), None) invalidate_graph_node(self._h_node) + with nogil: + HANDLE_RETURN(cydriver.cuGraphDestroyNode(node)) @property def pred(self) -> AdjacencySetProxy: diff --git a/cuda_core/tests/conftest.py b/cuda_core/tests/conftest.py index d7a81d88904..bf87677fd8d 100644 --- a/cuda_core/tests/conftest.py +++ b/cuda_core/tests/conftest.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +import functools import multiprocessing import os import pathlib @@ -91,6 +92,68 @@ def xfail_if_mempool_oom(err_or_exc, api_name=None, device=0): sys.path.insert(0, test_helpers_root) +def pytest_configure(config): + if _parallel_threads_enabled(config): + config.pluginmanager.register(_CudaCoreParallelPlugin(), name="_cuda_core_parallel_plugin") + + +def _parallel_threads_enabled(config): + parallel_threads = getattr(config.option, "parallel_threads", 0) + if parallel_threads == "auto": + return True + + return parallel_threads is not None and int(parallel_threads) > 0 + + +@contextmanager +def _init_cuda_context(): + # TODO: rename this to e.g. init_context + device = Device(0) + device.set_current() + + # Set option to avoid spin-waiting on synchronization. + if int(os.environ.get("CUDA_CORE_TEST_BLOCKING_SYNC", 0)) != 0: + handle_return( + driver.cuDevicePrimaryCtxSetFlags(device.device_id, driver.CUctx_flags.CU_CTX_SCHED_BLOCKING_SYNC) + ) + + try: + yield device + finally: + _ = _device_unset_current() + + +def _wrap_worker_cuda_test(func): + if getattr(func, "_cuda_core_worker_cuda_wrapped", False): + return func + + @functools.wraps(func) + def wrapper(*args, **kwargs): + with _init_cuda_context() as device: + if "init_cuda" in kwargs: + kwargs["init_cuda"] = device + if "mempool_device_x2" in kwargs: + kwargs["mempool_device_x2"] = _mempool_device_impl(2) + if "mempool_device_x3" in kwargs: + kwargs["mempool_device_x3"] = _mempool_device_impl(3) + return func(*args, **kwargs) + + wrapper._cuda_core_worker_cuda_wrapped = True + return wrapper + + +def _item_uses_init_cuda(item): + return "init_cuda" in getattr(item, "fixturenames", ()) + + +class _CudaCoreParallelPlugin: + @pytest.hookimpl(tryfirst=True) + def pytest_collection_modifyitems(self, config, items): + for item in items: + if _item_uses_init_cuda(item): + item.obj = _wrap_worker_cuda_test(item.obj) + + def skip_if_pinned_memory_unsupported(device): try: if not device.properties.host_memory_pools_supported: @@ -194,18 +257,8 @@ def session_setup(): @pytest.fixture def init_cuda(): - # TODO: rename this to e.g. init_context - device = Device(0) - device.set_current() - - # Set option to avoid spin-waiting on synchronization. - if int(os.environ.get("CUDA_CORE_TEST_BLOCKING_SYNC", 0)) != 0: - handle_return( - driver.cuDevicePrimaryCtxSetFlags(device.device_id, driver.CUctx_flags.CU_CTX_SCHED_BLOCKING_SYNC) - ) - - yield device - _ = _device_unset_current() + with _init_cuda_context() as device: + yield device def _device_unset_current() -> bool: @@ -247,7 +300,7 @@ def pop_all_contexts(): @pytest.fixture -def ipc_device(): +def ipc_device(init_cuda): """Obtains a device suitable for IPC-enabled mempool tests, or skips. The fixture also tracks every ``multiprocessing.Process`` spawned during @@ -257,8 +310,7 @@ def ipc_device(): """ from helpers.child_processes import track_child_processes - device = Device(0) - device.set_current() + device = init_cuda if not device.properties.memory_pools_supported: pytest.skip("Device does not support mempool operations") @@ -290,13 +342,15 @@ def ipc_memory_resource(request, ipc_device): assert mr.is_ipc_enabled yield mr mr.close() + # TODO(seberg): Make sure the `mr` and it's buffers are fully torn down. + # May be unnecessary as `mr.close()` is not parallel with other work. + ipc_device.sync() @pytest.fixture -def mempool_device(): +def mempool_device(init_cuda): """Obtains a device suitable for mempool tests, or skips.""" - device = Device(0) - device.set_current() + device = init_cuda if not device.properties.memory_pools_supported: pytest.skip("Device does not support mempool operations") @@ -323,13 +377,13 @@ def _mempool_device_impl(num): @pytest.fixture -def mempool_device_x2(): +def mempool_device_x2(init_cuda): """Fixture that provides two devices if available, otherwise skips test.""" return _mempool_device_impl(2) @pytest.fixture -def mempool_device_x3(): +def mempool_device_x3(init_cuda): """Fixture that provides three devices if available, otherwise skips test.""" return _mempool_device_impl(3) diff --git a/cuda_core/tests/example_tests/test_basic_examples.py b/cuda_core/tests/example_tests/test_basic_examples.py index 43fab4241db..c8d15677a54 100644 --- a/cuda_core/tests/example_tests/test_basic_examples.py +++ b/cuda_core/tests/example_tests/test_basic_examples.py @@ -100,6 +100,7 @@ def has_recent_memory_pool_support() -> bool: @pytest.mark.parametrize("example", sample_files) +@pytest.mark.parallel_threads_limit(8) def test_example(example): example_path = os.path.join(samples_path, example) has_package_requirements_or_skip(example_path) diff --git a/cuda_core/tests/graph/test_graph_definition.py b/cuda_core/tests/graph/test_graph_definition.py index da78bea577f..b18b2ac9bda 100644 --- a/cuda_core/tests/graph/test_graph_definition.py +++ b/cuda_core/tests/graph/test_graph_definition.py @@ -575,20 +575,6 @@ def node_spec(request, init_cuda): # ============================================================================= -@pytest.fixture -def sample_graphdef(init_cuda): - """A sample GraphDefinition for standalone tests.""" - return GraphDefinition() - - -@pytest.fixture -def dot_file(tmp_path): - """Temporary DOT file path, cleaned up after test.""" - path = tmp_path / "graph.dot" - yield path - path.unlink(missing_ok=True) - - # ============================================================================= # Topology tests (parameterized over graph specs) # ============================================================================= @@ -775,14 +761,16 @@ def registered(node): # ============================================================================= -def test_graphdef_handle_valid(sample_graphdef): +def test_graphdef_handle_valid(init_cuda): """GraphDefinition has a valid non-null handle.""" + sample_graphdef = GraphDefinition() assert sample_graphdef.handle is not None assert int(sample_graphdef.handle) != 0 -def test_graphdef_entry_is_virtual(sample_graphdef): +def test_graphdef_entry_is_virtual(init_cuda): """Internal entry node is virtual (no pred/succ, type is None).""" + sample_graphdef = GraphDefinition() entry = sample_graphdef._entry assert isinstance(entry, GraphNode) assert entry.pred == set() @@ -795,8 +783,9 @@ def test_graphdef_entry_is_virtual(sample_graphdef): # ============================================================================= -def test_alloc_zero_size_fails(sample_graphdef): +def test_alloc_zero_size_fails(init_cuda): """Alloc with zero size raises error (CUDA limitation).""" + sample_graphdef = GraphDefinition() _skip_if_no_mempool() from cuda.core._utils.cuda_utils import CUDAError @@ -804,8 +793,9 @@ def test_alloc_zero_size_fails(sample_graphdef): sample_graphdef.allocate(0) -def test_free_creates_dependency(sample_graphdef): +def test_free_creates_dependency(init_cuda): """Free node depends on its predecessor.""" + sample_graphdef = GraphDefinition() _skip_if_no_mempool() with xfail_on_graph_mempool_oom(): alloc = sample_graphdef.allocate(ALLOC_SIZE) @@ -813,8 +803,9 @@ def test_free_creates_dependency(sample_graphdef): assert alloc in free.pred -def test_alloc_free_chain(sample_graphdef): +def test_alloc_free_chain(init_cuda): """Alloc and free can be chained.""" + sample_graphdef = GraphDefinition() _skip_if_no_mempool() with xfail_on_graph_mempool_oom(): a1 = sample_graphdef.allocate(ALLOC_SIZE) @@ -831,8 +822,9 @@ def test_alloc_free_chain(sample_graphdef): # ============================================================================= -def test_alloc_memory_type_invalid(sample_graphdef): +def test_alloc_memory_type_invalid(init_cuda): """Invalid memory type raises ValueError.""" + sample_graphdef = GraphDefinition() with pytest.raises(ValueError, match="Invalid memory_type"): sample_graphdef.allocate(ALLOC_SIZE, memory_type="invalid") @@ -844,8 +836,9 @@ def test_alloc_memory_type_invalid(sample_graphdef): pytest.param(lambda d: d, id="Device_object"), ], ) -def test_alloc_device_option(sample_graphdef, device_spec): +def test_alloc_device_option(init_cuda, device_spec): """Device can be specified as int or Device object.""" + sample_graphdef = GraphDefinition() _skip_if_no_mempool() device = Device() with xfail_on_graph_mempool_oom(device): @@ -868,8 +861,9 @@ def test_alloc_peer_access(mempool_device_x2): @pytest.mark.parametrize("num_branches", [2, 3, 5]) -def test_join_merges_branches(sample_graphdef, num_branches): +def test_join_merges_branches(init_cuda, num_branches): """join() with multiple branches creates correct dependencies.""" + sample_graphdef = GraphDefinition() _skip_if_no_mempool() with xfail_on_graph_mempool_oom(): branches = [sample_graphdef.allocate(ALLOC_SIZE) for _ in range(num_branches)] @@ -883,8 +877,9 @@ def test_join_merges_branches(sample_graphdef, num_branches): # ============================================================================= -def test_launch_creates_node(sample_graphdef): +def test_launch_creates_node(init_cuda): """launch() creates a KernelNode.""" + sample_graphdef = GraphDefinition() mod = compile_common_kernels() kernel = mod.get_kernel("empty_kernel") config = LaunchConfig(grid=1, block=1) @@ -892,8 +887,9 @@ def test_launch_creates_node(sample_graphdef): assert isinstance(node, KernelNode) -def test_launch_chain_dependencies(sample_graphdef): +def test_launch_chain_dependencies(init_cuda): """Chained launches create correct dependencies.""" + sample_graphdef = GraphDefinition() mod = compile_common_kernels() kernel = mod.get_kernel("empty_kernel") config = LaunchConfig(grid=1, block=1) @@ -955,15 +951,17 @@ def _instantiate_and_upload(graph_definition, kwargs, stream): @pytest.mark.parametrize("inst_kwargs", _INSTANTIATE_ONLY_OPTIONS) -def test_instantiate_empty_graph(sample_graphdef, inst_kwargs): +def test_instantiate_empty_graph(init_cuda, inst_kwargs): """Empty graph can be instantiated.""" + sample_graphdef = GraphDefinition() graph = _instantiate(sample_graphdef, inst_kwargs) assert graph is not None @pytest.mark.parametrize("inst_kwargs", _INSTANTIATE_ONLY_OPTIONS) -def test_instantiate_with_nodes(sample_graphdef, inst_kwargs): +def test_instantiate_with_nodes(init_cuda, inst_kwargs): """Graph with nodes can be instantiated.""" + sample_graphdef = GraphDefinition() _skip_if_no_mempool() with xfail_on_graph_mempool_oom(): sample_graphdef.allocate(ALLOC_SIZE) @@ -973,8 +971,9 @@ def test_instantiate_with_nodes(sample_graphdef, inst_kwargs): @pytest.mark.skipif(not Device(0).properties.unified_addressing, reason="requires unified addressing") -def test_instantiate_and_execute_kernel_device_launch(sample_graphdef): +def test_instantiate_and_execute_kernel_device_launch(init_cuda): """Kernel-only graph can be instantiated with device_launch flag.""" + sample_graphdef = GraphDefinition() mod = compile_common_kernels() kernel = mod.get_kernel("empty_kernel") config = LaunchConfig(grid=1, block=1) @@ -990,8 +989,9 @@ def test_instantiate_and_execute_kernel_device_launch(sample_graphdef): @pytest.mark.parametrize("inst_kwargs", _EXECUTE_OPTIONS) -def test_instantiate_and_execute_kernel(sample_graphdef, inst_kwargs): +def test_instantiate_and_execute_kernel(init_cuda, inst_kwargs): """Graph with kernel can be instantiated and executed.""" + sample_graphdef = GraphDefinition() mod = compile_common_kernels() kernel = mod.get_kernel("empty_kernel") config = LaunchConfig(grid=1, block=1) @@ -1004,8 +1004,9 @@ def test_instantiate_and_execute_kernel(sample_graphdef, inst_kwargs): @pytest.mark.parametrize("inst_kwargs", _EXECUTE_OPTIONS) -def test_instantiate_and_execute_alloc_free(sample_graphdef, inst_kwargs): +def test_instantiate_and_execute_alloc_free(init_cuda, inst_kwargs): """Graph with alloc/free can be executed.""" + sample_graphdef = GraphDefinition() _skip_if_no_mempool() with xfail_on_graph_mempool_oom(): alloc = sample_graphdef.allocate(ALLOC_SIZE) @@ -1018,8 +1019,9 @@ def test_instantiate_and_execute_alloc_free(sample_graphdef, inst_kwargs): @pytest.mark.parametrize("inst_kwargs", _EXECUTE_OPTIONS) -def test_instantiate_and_execute_memset(sample_graphdef, inst_kwargs): +def test_instantiate_and_execute_memset(init_cuda, inst_kwargs): """Graph with alloc/memset/free can be executed.""" + sample_graphdef = GraphDefinition() _skip_if_no_mempool() with xfail_on_graph_mempool_oom(): alloc = sample_graphdef.allocate(ALLOC_SIZE) @@ -1033,8 +1035,9 @@ def test_instantiate_and_execute_memset(sample_graphdef, inst_kwargs): @pytest.mark.parametrize("inst_kwargs", _EXECUTE_OPTIONS) -def test_instantiate_and_execute_memcpy(sample_graphdef, inst_kwargs): +def test_instantiate_and_execute_memcpy(init_cuda, inst_kwargs): """Graph with alloc/memset/memcpy/free can be executed and data is copied.""" + sample_graphdef = GraphDefinition() _skip_if_no_mempool() import ctypes @@ -1058,8 +1061,9 @@ def test_instantiate_and_execute_memcpy(sample_graphdef, inst_kwargs): assert all(b == 0xAB for b in host_buf) -def test_instantiate_and_execute_child_graph(sample_graphdef): +def test_instantiate_and_execute_child_graph(init_cuda): """Graph with embedded child graph can be executed.""" + sample_graphdef = GraphDefinition() child = GraphDefinition() mod = compile_common_kernels() kernel = mod.get_kernel("empty_kernel") @@ -1075,8 +1079,9 @@ def test_instantiate_and_execute_child_graph(sample_graphdef): stream.sync() -def test_instantiate_and_execute_host_callback(sample_graphdef): +def test_instantiate_and_execute_host_callback(init_cuda): """Graph with host callback can be executed and callback is invoked.""" + sample_graphdef = GraphDefinition() results = [] def my_callback(): @@ -1093,8 +1098,9 @@ def my_callback(): assert results == [42] -def test_instantiate_and_execute_host_callback_cfunc(sample_graphdef): +def test_instantiate_and_execute_host_callback_cfunc(init_cuda): """Graph with ctypes function pointer callback can be executed.""" + sample_graphdef = GraphDefinition() import ctypes CALLBACK = ctypes.CFUNCTYPE(None, ctypes.c_void_p) @@ -1115,8 +1121,9 @@ def raw_fn(data): assert called[0] -def test_host_callback_cfunc_with_user_data(sample_graphdef): +def test_host_callback_cfunc_with_user_data(init_cuda): """Host callback with bytes user_data passes data to C function.""" + sample_graphdef = GraphDefinition() import ctypes CALLBACK = ctypes.CFUNCTYPE(None, ctypes.c_void_p) @@ -1137,14 +1144,16 @@ def read_byte(data): assert result[0] == 0xAB -def test_host_callback_user_data_rejected_for_python_callable(sample_graphdef): +def test_host_callback_user_data_rejected_for_python_callable(init_cuda): """user_data is rejected for Python callables.""" + sample_graphdef = GraphDefinition() with pytest.raises(ValueError, match="user_data is only supported"): sample_graphdef.callback(lambda: None, user_data=b"hello") -def test_instantiate_and_execute_event_record_wait(sample_graphdef): +def test_instantiate_and_execute_event_record_wait(init_cuda): """Graph with event record and wait nodes can be executed.""" + sample_graphdef = GraphDefinition() event = Device().create_event() rec = sample_graphdef.record(event) rec.wait(event) @@ -1166,8 +1175,9 @@ def _skip_unless_cc_90(): pytest.skip("Conditional node execution requires CC >= 9.0 (Hopper)") -def test_instantiate_and_execute_if_then(sample_graphdef): +def test_instantiate_and_execute_if_then(init_cuda): """If-conditional node: body executes only when condition is non-zero.""" + sample_graphdef = GraphDefinition() _skip_unless_cc_90() _skip_if_no_mempool() import ctypes @@ -1199,8 +1209,9 @@ def test_instantiate_and_execute_if_then(sample_graphdef): assert result[0] == 1 -def test_instantiate_and_execute_if_else(sample_graphdef): +def test_instantiate_and_execute_if_else(init_cuda): """If-else node: then or else branch executes based on condition.""" + sample_graphdef = GraphDefinition() _skip_unless_cc_90() _skip_if_no_mempool() import ctypes @@ -1234,8 +1245,9 @@ def test_instantiate_and_execute_if_else(sample_graphdef): assert result[0] == 2 -def test_instantiate_and_execute_switch(sample_graphdef): +def test_instantiate_and_execute_switch(init_cuda): """Switch node: selected branch executes based on condition value.""" + sample_graphdef = GraphDefinition() _skip_unless_cc_90() _skip_if_no_mempool() import ctypes @@ -1268,8 +1280,9 @@ def test_instantiate_and_execute_switch(sample_graphdef): assert result[0] == 1 -def test_conditional_node_type_preserved_by_nodes(sample_graphdef): +def test_conditional_node_type_preserved_by_nodes(init_cuda): """Conditional nodes appear as ConditionalNode base when read back from graph.""" + sample_graphdef = GraphDefinition() condition = try_create_condition(sample_graphdef) if_node = sample_graphdef.if_then(condition) assert isinstance(if_node, IfNode) @@ -1285,8 +1298,10 @@ def test_conditional_node_type_preserved_by_nodes(sample_graphdef): # ============================================================================= -def test_debug_dot_print_creates_file(sample_graphdef, dot_file): +def test_debug_dot_print_creates_file(init_cuda, tmp_path): """debug_dot_print writes a DOT file.""" + sample_graphdef = GraphDefinition() + dot_file = tmp_path / "graph.dot" _skip_if_no_mempool() with xfail_on_graph_mempool_oom(): sample_graphdef.allocate(ALLOC_SIZE) @@ -1296,8 +1311,10 @@ def test_debug_dot_print_creates_file(sample_graphdef, dot_file): assert "digraph" in content -def test_debug_dot_print_with_options(sample_graphdef, dot_file): +def test_debug_dot_print_with_options(init_cuda, tmp_path): """debug_dot_print accepts GraphDebugPrintOptions.""" + sample_graphdef = GraphDefinition() + dot_file = tmp_path / "graph.dot" _skip_if_no_mempool() with xfail_on_graph_mempool_oom(): sample_graphdef.allocate(ALLOC_SIZE) @@ -1306,8 +1323,10 @@ def test_debug_dot_print_with_options(sample_graphdef, dot_file): assert dot_file.exists() -def test_debug_dot_print_invalid_options(sample_graphdef, dot_file): +def test_debug_dot_print_invalid_options(init_cuda, tmp_path): """debug_dot_print rejects invalid options type.""" + sample_graphdef = GraphDefinition() + dot_file = tmp_path / "graph.dot" _skip_if_no_mempool() with xfail_on_graph_mempool_oom(): sample_graphdef.allocate(ALLOC_SIZE) diff --git a/cuda_core/tests/graph/test_graph_memory_resource.py b/cuda_core/tests/graph/test_graph_memory_resource.py index 9fc794f4cca..482e8fe1c57 100644 --- a/cuda_core/tests/graph/test_graph_memory_resource.py +++ b/cuda_core/tests/graph/test_graph_memory_resource.py @@ -21,6 +21,13 @@ from cuda.core._utils.cuda_utils import CUDAError from cuda.core.graph import GraphCompleteOptions +# NOTE(seberg): "global" mode seems thread-unsafe even when working on stream +_GRAPH_MODES = [ + pytest.param("global", marks=pytest.mark.thread_unsafe(reason="gb instances share stream unsafely")), + "thread_local", + "relaxed", +] + def _common_kernels_alloc(): code = """ @@ -80,7 +87,7 @@ def free(self, buffers): self.stream.sync() -@pytest.mark.parametrize("mode", ["no_graph", "global", "thread_local", "relaxed"]) +@pytest.mark.parametrize("mode", ["no_graph"] + _GRAPH_MODES) @pytest.mark.parametrize("action", ["incr", "fill"]) def test_graph_alloc(mempool_device, mode, action): """Test basic graph capture with memory allocated and deallocated by @@ -130,7 +137,7 @@ def apply_kernels(mr, stream, out): assert compare_buffer_to_constant(out, 3) else: # Capture work, then upload and launch. - gb = device.create_graph_builder().begin_building(mode) + gb = stream.create_graph_builder().begin_building(mode) with xfail_on_graph_mempool_oom(device): apply_kernels(mr=gmr, stream=gb, out=out) graph = gb.end_building().complete() @@ -150,7 +157,7 @@ def apply_kernels(mr, stream, out): @pytest.mark.skipif(IS_WINDOWS or IS_WSL, reason="auto_free_on_launch not supported on Windows") -@pytest.mark.parametrize("mode", ["global", "thread_local", "relaxed"]) +@pytest.mark.parametrize("mode", _GRAPH_MODES) def test_graph_alloc_with_output(mempool_device, mode): """Test for memory allocated in a graph being used outside the graph.""" NBYTES = 64 @@ -168,7 +175,7 @@ def test_graph_alloc_with_output(mempool_device, mode): # Construct a graph to copy and increment the input. It returns a new # buffer allocated within the graph. The auto_free_on_launch option # is required to properly use the output buffer. - gb = device.create_graph_builder().begin_building(mode) + gb = stream.create_graph_builder().begin_building(mode) with xfail_on_graph_mempool_oom(device): out = gmr.allocate(NBYTES, stream=gb) out.copy_from(in_, stream=gb) @@ -195,7 +202,8 @@ def test_graph_alloc_with_output(mempool_device, mode): assert compare_buffer_to_constant(out, 6) -@pytest.mark.parametrize("mode", ["global", "thread_local", "relaxed"]) +@pytest.mark.parametrize("mode", _GRAPH_MODES) +@pytest.mark.thread_unsafe(reason="gb instances share default stream") def test_graph_mem_alloc_zero(mempool_device, mode): device = mempool_device gb = device.create_graph_builder().begin_building(mode) @@ -213,7 +221,8 @@ def test_graph_mem_alloc_zero(mempool_device, mode): assert buffer.device_id == int(device) -@pytest.mark.parametrize("mode", ["global", "thread_local", "relaxed"]) +@pytest.mark.parametrize("mode", _GRAPH_MODES) +@pytest.mark.thread_unsafe(reason="GMR is shared, so high mark is global") def test_graph_mem_set_attributes(mempool_device, mode): device = mempool_device stream = device.create_stream() @@ -265,7 +274,7 @@ def test_graph_mem_set_attributes(mempool_device, mode): mman.reset() -@pytest.mark.parametrize("mode", ["global", "thread_local", "relaxed"]) +@pytest.mark.parametrize("mode", _GRAPH_MODES) def test_gmr_check_capture_state(mempool_device, mode): """ Test expected errors (and non-errors) using GraphMemoryResource with graph @@ -284,7 +293,7 @@ def test_gmr_check_capture_state(mempool_device, mode): gmr.allocate(1, stream=stream) # Capturing - gb = device.create_graph_builder().begin_building(mode=mode) + gb = stream.create_graph_builder().begin_building(mode=mode) with xfail_on_graph_mempool_oom(device): gmr.allocate(1, stream=gb) # no error gb.end_building().complete() @@ -320,7 +329,7 @@ def test_graph_memory_resource_attributes_repr(mempool_device): assert "used_mem_high=" in r -@pytest.mark.parametrize("mode", ["global", "thread_local", "relaxed"]) +@pytest.mark.parametrize("mode", _GRAPH_MODES) def test_dmr_check_capture_state(mempool_device, mode): """ Test expected errors (and non-errors) using DeviceMemoryResource with graph @@ -334,7 +343,7 @@ def test_dmr_check_capture_state(mempool_device, mode): dmr.allocate(1, stream=stream).close() # no error # Capturing - gb = device.create_graph_builder().begin_building(mode=mode) + gb = stream.create_graph_builder().begin_building(mode=mode) with pytest.raises( RuntimeError, match=r"cannot perform memory operations on a capturing " diff --git a/cuda_core/tests/helpers/latch.py b/cuda_core/tests/helpers/latch.py index c28fb222641..978e2dbdf18 100644 --- a/cuda_core/tests/helpers/latch.py +++ b/cuda_core/tests/helpers/latch.py @@ -1,7 +1,8 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 import ctypes +import threading import pytest @@ -20,9 +21,15 @@ class LatchKernel: Manages a kernel that blocks stream progress until released. """ - def __init__(self, device, timeout_sec=60): - if helpers.CUDA_INCLUDE_PATH is None: - pytest.skip("need CUDA header") + _latch_kernel_lock = threading.Lock() + _latch_kernels = {} + + @classmethod + def _get_kernel(cls, device): + kernel = cls._latch_kernels.get(device.uuid) + if kernel is not None: + return kernel + code = """ #include @@ -41,6 +48,7 @@ def __init__(self, device, timeout_sec=60): // Check for timeout if (clock64() - start >= timeout_cycles) { + signal.store(-1, cuda::memory_order_relaxed); break; // Timeout reached } @@ -56,14 +64,25 @@ def __init__(self, device, timeout_sec=60): ) prog = Program(code, code_type="c++", options=program_options) mod = prog.compile(target_type="cubin") - self.kernel = mod.get_kernel("latch") + kernel = mod.get_kernel("latch") + + return cls._latch_kernels.setdefault(device.uuid, kernel) + + def __init__(self, device, timeout_sec=60): + if helpers.CUDA_INCLUDE_PATH is None: + pytest.skip("need CUDA header") + + with self._latch_kernel_lock: + self.kernel = self._get_kernel(device) mr = LegacyPinnedMemoryResource() self.buffer = mr.allocate(4) - self.busy_wait_flag[0] = 0 + self.busy_wait_flag[0] = 1 clock_rate_hz = device.properties.clock_rate * 1000 self.timeout_cycles = int(timeout_sec * clock_rate_hz) + self.busy_wait_flag[0] = 0 + def launch(self, stream): """Launch the latch kernel, blocking stream progress via busy waiting.""" config = LaunchConfig(grid=1, block=1) diff --git a/cuda_core/tests/memory/test_managed_ops.py b/cuda_core/tests/memory/test_managed_ops.py index b1f0c74664a..89917041c93 100644 --- a/cuda_core/tests/memory/test_managed_ops.py +++ b/cuda_core/tests/memory/test_managed_ops.py @@ -345,6 +345,7 @@ def test_from_handle(self, init_cuda): finally: plain.close() + @pytest.mark.thread_unsafe(reason="external_managed_buffer is shared between threads") def test_read_mostly_roundtrip(self, external_managed_buffer): buf = external_managed_buffer assert buf.read_mostly is False @@ -353,6 +354,7 @@ def test_read_mostly_roundtrip(self, external_managed_buffer): buf.read_mostly = False assert buf.read_mostly is False + @pytest.mark.thread_unsafe(reason="external_managed_buffer is shared between threads") def test_preferred_location_roundtrip(self, location_ops_device, external_managed_buffer): device = location_ops_device buf = external_managed_buffer @@ -367,6 +369,7 @@ def test_preferred_location_roundtrip(self, location_ops_device, external_manage buf.preferred_location = None assert buf.preferred_location is None + @pytest.mark.thread_unsafe(reason="external_managed_buffer is shared between threads") def test_preferred_location_roundtrip_host_numa(self, location_ops_device): """Host(numa_id=N) round-trips correctly on CUDA 13 builds.""" from cuda.core._utils.version import binding_version @@ -387,6 +390,7 @@ def test_preferred_location_roundtrip_host_numa(self, location_ops_device): finally: plain.close() + @pytest.mark.thread_unsafe(reason="external_managed_buffer is shared between threads") def test_accessed_by_add_discard(self, location_ops_device, external_managed_buffer): device = location_ops_device buf = external_managed_buffer @@ -398,6 +402,7 @@ def test_accessed_by_add_discard(self, location_ops_device, external_managed_buf buf.accessed_by.discard(device) assert device not in buf.accessed_by + @pytest.mark.thread_unsafe(reason="external_managed_buffer is shared between threads") def test_accessed_by_mutable_set_interface(self, location_ops_device, external_managed_buffer): """Full MutableSet conformance pass on AccessedBySetProxy. @@ -417,6 +422,7 @@ def test_accessed_by_mutable_set_interface(self, location_ops_device, external_m non_member=Host(numa_id=0), ) + @pytest.mark.thread_unsafe(reason="external_managed_buffer is shared between threads") def test_accessed_by_set_assignment(self, location_ops_device, external_managed_buffer): device = location_ops_device buf = external_managed_buffer diff --git a/cuda_core/tests/memory_ipc/test_errors.py b/cuda_core/tests/memory_ipc/test_errors.py index 42f34dd61c2..4fc119fc170 100644 --- a/cuda_core/tests/memory_ipc/test_errors.py +++ b/cuda_core/tests/memory_ipc/test_errors.py @@ -16,6 +16,10 @@ POOL_SIZE = 2097152 +# these tests spawn new processes and files which fails for very many threads +pytestmark = pytest.mark.parallel_threads_limit(4) + + def test_outer_timeout_marker_is_applied(request): """Verify that memory_ipc/conftest.py applies the outer pytest-timeout marker. diff --git a/cuda_core/tests/memory_ipc/test_event_ipc.py b/cuda_core/tests/memory_ipc/test_event_ipc.py index 48985e67b58..e3cefe6a211 100644 --- a/cuda_core/tests/memory_ipc/test_event_ipc.py +++ b/cuda_core/tests/memory_ipc/test_event_ipc.py @@ -16,6 +16,10 @@ NBYTES = 64 +# these tetss spawn new processes and files which fails for very many threads +pytestmark = pytest.mark.parallel_threads_limit(4) + + @pytest.mark.skipif(Device().compute_capability.major < 7, reason="__nanosleep is only available starting Volta (sm70)") class TestEventIpc: """Check the basic usage of IPC-enabled events with a latch kernel.""" diff --git a/cuda_core/tests/memory_ipc/test_ipc_duplicate_import.py b/cuda_core/tests/memory_ipc/test_ipc_duplicate_import.py index 8d450fa8e3f..eaa6ddec92f 100644 --- a/cuda_core/tests/memory_ipc/test_ipc_duplicate_import.py +++ b/cuda_core/tests/memory_ipc/test_ipc_duplicate_import.py @@ -24,6 +24,9 @@ ENABLE_LOGGING = False # Set True for test debugging and development +# these tests spawn new processes and files which fails for very many threads +pytestmark = pytest.mark.parallel_threads_limit(4) + def child_main(log, queue): log.prefix = " child: " diff --git a/cuda_core/tests/memory_ipc/test_leaks.py b/cuda_core/tests/memory_ipc/test_leaks.py index 6fc4d03f142..c6e44824137 100644 --- a/cuda_core/tests/memory_ipc/test_leaks.py +++ b/cuda_core/tests/memory_ipc/test_leaks.py @@ -23,6 +23,8 @@ not USING_FDS or not HAVE_PSUTIL, reason="mempool allocation handle is not using fds or psutil is unavailable" ) +pytestmark = pytest.mark.thread_unsafe(reason="Tests number of fds which is shared.") + @pytest.mark.flaky(reruns=2) @skip_if_unrunnable diff --git a/cuda_core/tests/memory_ipc/test_memory_ipc.py b/cuda_core/tests/memory_ipc/test_memory_ipc.py index 0923fe28d8b..43d356789e7 100644 --- a/cuda_core/tests/memory_ipc/test_memory_ipc.py +++ b/cuda_core/tests/memory_ipc/test_memory_ipc.py @@ -14,6 +14,9 @@ NWORKERS = 2 NTASKS = 2 +# these tests spawn new processes and files which fails for very many threads +pytestmark = pytest.mark.parallel_threads_limit(4) + class TestIpcMempool: @pytest.mark.flaky(reruns=2) diff --git a/cuda_core/tests/memory_ipc/test_peer_access.py b/cuda_core/tests/memory_ipc/test_peer_access.py index 9e9e2879ae7..ac7f71a88e9 100644 --- a/cuda_core/tests/memory_ipc/test_peer_access.py +++ b/cuda_core/tests/memory_ipc/test_peer_access.py @@ -14,6 +14,9 @@ NBYTES = 64 POOL_SIZE = 2097152 +# these tests spawn new processes and files which fails for very many threads +pytestmark = pytest.mark.parallel_threads_limit(4) + class TestPeerAccessNotPreservedOnImport: """ @@ -89,6 +92,8 @@ def test_main(self, ipc_mempool_device_x2, grant_access_in_parent): assert process.exitcode == 0 buffer.close() + # TODO(seberg): 2026-06: mr close may be unsafe with incomplete `buf.close()` + dev1.sync() mr.close() def child_main(self, mr, buffer): @@ -126,4 +131,6 @@ def child_main(self, mr, buffer): PatternGen(dev0, NBYTES).verify_buffer(buffer, seed=False) buffer.close() + # TODO(seberg): 2026-06: mr close may be unsafe with incomplete `buf.close()` + dev1.sync() mr.close() diff --git a/cuda_core/tests/memory_ipc/test_send_buffers.py b/cuda_core/tests/memory_ipc/test_send_buffers.py index cc7f45d67c2..59216cd9cce 100644 --- a/cuda_core/tests/memory_ipc/test_send_buffers.py +++ b/cuda_core/tests/memory_ipc/test_send_buffers.py @@ -16,6 +16,9 @@ NTASKS = 7 POOL_SIZE = 2097152 +# these tests spawn new processes and files which fails for very many threads +pytestmark = pytest.mark.parallel_threads_limit(4) + class TestIpcSendBuffers: @pytest.mark.flaky(reruns=2) @@ -26,6 +29,7 @@ def test_main(self, ipc_device, nmrs): device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] + buffers = [] try: # Allocate and fill memory. @@ -51,6 +55,10 @@ def test_main(self, ipc_device, nmrs): pgen.verify_buffer(buffer, seed=True) buffer.close() finally: + for buffer in buffers: + buffer.close() + # TODO(seberg): 2026-06: mr close may be unsafe with incomplete `buf.close()` + device.sync() for mr in mrs: mr.close() diff --git a/cuda_core/tests/memory_ipc/test_serialize.py b/cuda_core/tests/memory_ipc/test_serialize.py index 2f0e429b103..4289de4b5a9 100644 --- a/cuda_core/tests/memory_ipc/test_serialize.py +++ b/cuda_core/tests/memory_ipc/test_serialize.py @@ -15,6 +15,9 @@ NBYTES = 64 POOL_SIZE = 2097152 +# these tests spawn new processes and files which fails for very many threads +pytestmark = pytest.mark.parallel_threads_limit(4) + class TestObjectSerializationDirect: """ diff --git a/cuda_core/tests/memory_ipc/test_workerpool.py b/cuda_core/tests/memory_ipc/test_workerpool.py index 609fadbcf3e..358c16fd7bf 100644 --- a/cuda_core/tests/memory_ipc/test_workerpool.py +++ b/cuda_core/tests/memory_ipc/test_workerpool.py @@ -16,6 +16,9 @@ NTASKS = 20 POOL_SIZE = 2097152 +# these tests spawn new processes and files which fails for very many threads +pytestmark = pytest.mark.parallel_threads_limit(4) + class TestIpcWorkerPool: """ @@ -32,6 +35,7 @@ def test_main(self, ipc_device, nmrs): device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] + buffers = [] try: buffers = [mr.allocate(NBYTES, stream=device.default_stream) for mr, _ in zip(cycle(mrs), range(NTASKS))] @@ -42,8 +46,11 @@ def test_main(self, ipc_device, nmrs): pgen = PatternGen(device, NBYTES) for buffer in buffers: pgen.verify_buffer(buffer, seed=True) - buffer.close() finally: + for buffer in buffers: + buffer.close() + # TODO(seberg): 2026-06: mr close may be unsafe with incomplete `buf.close()` + device.sync() for mr in mrs: mr.close() @@ -74,6 +81,7 @@ def test_main(self, ipc_device, nmrs): device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] + buffers = [] try: buffers = [mr.allocate(NBYTES, stream=device.default_stream) for mr, _ in zip(cycle(mrs), range(NTASKS))] @@ -87,8 +95,11 @@ def test_main(self, ipc_device, nmrs): pgen = PatternGen(device, NBYTES) for buffer in buffers: pgen.verify_buffer(buffer, seed=True) - buffer.close() finally: + for buffer in buffers: + buffer.close() + # TODO(seberg): 2026-06: mr close may be unsafe with incomplete `buf.close()` + device.sync() for mr in mrs: mr.close() @@ -124,6 +135,7 @@ def test_main(self, ipc_device, nmrs): device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] + buffers = [] try: buffers = [mr.allocate(NBYTES, stream=device.default_stream) for mr, _ in zip(cycle(mrs), range(NTASKS))] @@ -134,8 +146,11 @@ def test_main(self, ipc_device, nmrs): pgen = PatternGen(device, NBYTES) for buffer in buffers: pgen.verify_buffer(buffer, seed=True) - buffer.close() finally: + for buffer in buffers: + buffer.close() + # TODO(seberg): 2026-06: mr close may be unsafe with incomplete `buf.close()` + device.sync() for mr in mrs: mr.close() diff --git a/cuda_core/tests/system/test_system_device.py b/cuda_core/tests/system/test_system_device.py index 4aa13840b48..c202407dc55 100644 --- a/cuda_core/tests/system/test_system_device.py +++ b/cuda_core/tests/system/test_system_device.py @@ -268,6 +268,7 @@ def test_unpack_bitmask_single_value(): _device._unpack_bitmask(1) +@pytest.mark.parallel_threads_limit(4) # timeouts are slow @pytest.mark.skipif(helpers.IS_WSL or helpers.IS_WINDOWS, reason="Events not supported on WSL or Windows") def test_register_events(): # This is not the world's greatest test. All of the events are pretty diff --git a/cuda_core/tests/test_graphics.py b/cuda_core/tests/test_graphics.py index 6f5877f76b0..e2b22a20c59 100644 --- a/cuda_core/tests/test_graphics.py +++ b/cuda_core/tests/test_graphics.py @@ -20,6 +20,9 @@ ) from cuda.core.utils import StridedMemoryView +# TODO(seberg): Maybe some of these tests can be made threadable? +pytestmark = pytest.mark.thread_unsafe(reason="OpenGL context not threadable") + # --------------------------------------------------------------------------- # GL context + buffer helpers # --------------------------------------------------------------------------- diff --git a/cuda_core/tests/test_memory.py b/cuda_core/tests/test_memory.py index 920cd4bb0fd..1c90d75fd75 100644 --- a/cuda_core/tests/test_memory.py +++ b/cuda_core/tests/test_memory.py @@ -251,9 +251,8 @@ def _pattern_bytes(value) -> bytes: @pytest.fixture(params=["device", "unified", "pinned"]) -def fill_env(request): - device = Device() - device.set_current() +def fill_env(request, init_cuda): + device = init_cuda if request.param == "device": mr = DummyDeviceMemoryResource(device) elif request.param == "unified": @@ -1103,6 +1102,8 @@ def test_device_memory_resource_with_options(init_cuda): device.sync() dst_buffer.close() src_buffer.close() + # TODO(seberg): 2026-06: mr close may be unsafe with incomplete `buf.close()` + device.sync() def test_pinned_memory_resource_with_options(init_cuda): @@ -1149,6 +1150,8 @@ def test_pinned_memory_resource_with_options(init_cuda): device.sync() dst_buffer.close() src_buffer.close() + # TODO(seberg): 2026-06: mr close may be unsafe with incomplete `buf.close()` + device.sync() def test_managed_memory_resource_with_options(init_cuda): @@ -1323,6 +1326,7 @@ def test_managed_memory_resource_preferred_location_validation(init_cuda): ) +@pytest.mark.thread_unsafe(reason="Uses mock.") def test_managed_memory_resource_host_numa_auto_resolve_failure(init_cuda): """host_numa with None raises RuntimeError when NUMA ID cannot be determined.""" from unittest.mock import MagicMock, patch @@ -1364,6 +1368,8 @@ def test_mempool_ipc_errors(mempool_device): Buffer.from_ipc_descriptor(mr, handle, stream=device.default_stream) buffer.close() + # TODO(seberg): 2026-06: mr close may be unsafe with incomplete `buf.close()` + device.sync() def test_pinned_mempool_ipc_basic(): @@ -1404,6 +1410,8 @@ def test_pinned_mempool_ipc_basic(): assert ipc_desc.size == 1024 buffer.close() + # TODO(seberg): 2026-06: mr close may be unsafe with incomplete `buf.close()` + device.sync() mr.close() @@ -1435,6 +1443,8 @@ def test_pinned_mempool_ipc_errors(): Buffer.from_ipc_descriptor(mr, handle, stream=device.default_stream) buffer.close() + # TODO(seberg): 2026-06: mr close may be unsafe with incomplete `buf.close()` + device.sync() mr.close() diff --git a/cuda_core/tests/test_memory_peer_access.py b/cuda_core/tests/test_memory_peer_access.py index 71beb459143..68c32ce69c6 100644 --- a/cuda_core/tests/test_memory_peer_access.py +++ b/cuda_core/tests/test_memory_peer_access.py @@ -12,6 +12,8 @@ NBYTES = 1024 +pytestmark = pytest.mark.thread_unsafe(reason="peer access tests mutate process-global CUDA memory-pool access state") + def test_peer_access_basic(mempool_device_x2): """Basic tests for dmr.peer_accessible_by.""" diff --git a/cuda_core/tests/test_multiprocessing_warning.py b/cuda_core/tests/test_multiprocessing_warning.py index 0f96e0abfbc..94a671ff2f8 100644 --- a/cuda_core/tests/test_multiprocessing_warning.py +++ b/cuda_core/tests/test_multiprocessing_warning.py @@ -12,12 +12,17 @@ import warnings from unittest.mock import patch +import pytest + from cuda.core import DeviceMemoryResource, DeviceMemoryResourceOptions, EventOptions from cuda.core._event import _reduce_event from cuda.core._memory._device_memory_resource import _deep_reduce_device_memory_resource from cuda.core._memory._ipc import _reduce_allocation_handle from cuda.core._utils.cuda_utils import check_multiprocessing_start_method, reset_fork_warning +# We could move these to a (session) fixtures +pytestmark = pytest.mark.thread_unsafe(reason="all tests use unittest.mock.patch") + def test_warn_on_fork_method_device_memory_resource(ipc_device): """Test that warning is emitted when DeviceMemoryResource is pickled with fork method.""" diff --git a/cuda_core/tests/test_object_protocols.py b/cuda_core/tests/test_object_protocols.py index d1085a952bb..e4e8ee21c9b 100644 --- a/cuda_core/tests/test_object_protocols.py +++ b/cuda_core/tests/test_object_protocols.py @@ -233,7 +233,11 @@ def sample_ipc_buffer_descriptor(ipc_device): options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mr = DeviceMemoryResource(ipc_device, options=options) buf = mr.allocate(64, stream=ipc_device.default_stream) - return buf.ipc_descriptor + descriptor = buf.ipc_descriptor + buf.close() + # TODO(seberg): 2026-06: mr close may be unsafe with incomplete `buf.close()` + ipc_device.sync() + return descriptor @pytest.fixture @@ -523,6 +527,26 @@ def sample_switch_node_alt(sample_graphdef): return sample_graphdef.switch(condition, 3) +# Indirect-parametrize helpers: request.getfixturevalue() runs here, in the +# fixture (main thread), so the resolved object is already available when the +# test function runs in a worker thread. + + +@pytest.fixture +def sample_object(request): + return request.getfixturevalue(request.param) + + +@pytest.fixture +def sample_object_a(request): + return request.getfixturevalue(request.param) + + +@pytest.fixture +def sample_object_b(request): + return request.getfixturevalue(request.param) + + # ============================================================================= # Type groupings # ============================================================================= @@ -718,12 +742,11 @@ def sample_switch_node_alt(sample_graphdef): # ============================================================================= -@pytest.mark.parametrize("fixture_name", WEAKREF_TYPES) -def test_weakref_supported(fixture_name, request): +@pytest.mark.parametrize("sample_object", WEAKREF_TYPES, indirect=True) +def test_weakref_supported(sample_object): """Object supports weak references.""" - obj = request.getfixturevalue(fixture_name) - ref = weakref.ref(obj) - assert ref() is obj + ref = weakref.ref(sample_object) + assert ref() is sample_object # ============================================================================= @@ -731,27 +754,22 @@ def test_weakref_supported(fixture_name, request): # ============================================================================= -@pytest.mark.parametrize("fixture_name", HASH_TYPES) -def test_hash_consistency(fixture_name, request): +@pytest.mark.parametrize("sample_object", HASH_TYPES, indirect=True) +def test_hash_consistency(sample_object): """Hash is consistent across multiple calls.""" - obj = request.getfixturevalue(fixture_name) - assert hash(obj) == hash(obj) + assert hash(sample_object) == hash(sample_object) -@pytest.mark.parametrize("a_name,b_name", SAME_TYPE_PAIRS) -def test_hash_distinct_same_type(a_name, b_name, request): +@pytest.mark.parametrize("sample_object_a,sample_object_b", SAME_TYPE_PAIRS, indirect=True) +def test_hash_distinct_same_type(sample_object_a, sample_object_b): """Distinct objects of the same type have different hashes.""" - obj_a = request.getfixturevalue(a_name) - obj_b = request.getfixturevalue(b_name) - assert hash(obj_a) != hash(obj_b) # extremely unlikely + assert hash(sample_object_a) != hash(sample_object_b) # extremely unlikely -@pytest.mark.parametrize("a_name,b_name", itertools.combinations(HASH_TYPES, 2)) -def test_hash_distinct_cross_type(a_name, b_name, request): +@pytest.mark.parametrize("sample_object_a,sample_object_b", itertools.combinations(HASH_TYPES, 2), indirect=True) +def test_hash_distinct_cross_type(sample_object_a, sample_object_b): """Distinct objects of different types have different hashes.""" - obj_a = request.getfixturevalue(a_name) - obj_b = request.getfixturevalue(b_name) - assert hash(obj_a) != hash(obj_b) # extremely unlikely + assert hash(sample_object_a) != hash(sample_object_b) # extremely unlikely # ============================================================================= @@ -759,41 +777,35 @@ def test_hash_distinct_cross_type(a_name, b_name, request): # ============================================================================= -@pytest.mark.parametrize("fixture_name", EQ_TYPES) -def test_equality_basic(fixture_name, request): +@pytest.mark.parametrize("sample_object", EQ_TYPES, indirect=True) +def test_equality_basic(sample_object): """Object equality: reflexive, not equal to None or other types.""" - obj = request.getfixturevalue(fixture_name) - assert obj == obj - assert obj is not None - assert obj != "string" - if hasattr(obj, "handle"): - assert obj != obj.handle + assert sample_object == sample_object + assert sample_object is not None + assert sample_object != "string" + if hasattr(sample_object, "handle"): + assert sample_object != sample_object.handle -@pytest.mark.parametrize("a_name,b_name", itertools.combinations(EQ_TYPES, 2)) -def test_no_cross_type_equality(a_name, b_name, request): +@pytest.mark.parametrize("sample_object_a,sample_object_b", itertools.combinations(EQ_TYPES, 2), indirect=True) +def test_no_cross_type_equality(sample_object_a, sample_object_b): """No two distinct objects of different types should compare equal.""" - obj_a = request.getfixturevalue(a_name) - obj_b = request.getfixturevalue(b_name) - assert obj_a != obj_b + assert sample_object_a != sample_object_b -@pytest.mark.parametrize("a_name,b_name", SAME_TYPE_PAIRS) -def test_same_type_inequality(a_name, b_name, request): +@pytest.mark.parametrize("sample_object_a,sample_object_b", SAME_TYPE_PAIRS, indirect=True) +def test_same_type_inequality(sample_object_a, sample_object_b): """Two distinct objects of the same type should not compare equal.""" - obj_a = request.getfixturevalue(a_name) - obj_b = request.getfixturevalue(b_name) - assert obj_a is not obj_b - assert obj_a != obj_b + assert sample_object_a is not sample_object_b + assert sample_object_a != sample_object_b -@pytest.mark.parametrize("fixture_name,copy_fn", FROM_HANDLE_COPIES) -def test_equality_same_handle(fixture_name, copy_fn, request): +@pytest.mark.parametrize("sample_object,copy_fn", FROM_HANDLE_COPIES, indirect=["sample_object"]) +def test_equality_same_handle(sample_object, copy_fn): """Two wrappers around the same handle should compare equal.""" - obj = request.getfixturevalue(fixture_name) - obj2 = copy_fn(obj) - assert obj == obj2 - assert hash(obj) == hash(obj2) + obj2 = copy_fn(sample_object) + assert sample_object == obj2 + assert hash(sample_object) == hash(obj2) # ============================================================================= @@ -801,48 +813,43 @@ def test_equality_same_handle(fixture_name, copy_fn, request): # ============================================================================= -@pytest.mark.parametrize("fixture_name", DICT_KEY_TYPES) -def test_usable_as_dict_key(fixture_name, request): +@pytest.mark.parametrize("sample_object", DICT_KEY_TYPES, indirect=True) +def test_usable_as_dict_key(sample_object): """Object can be used as a dictionary key.""" - obj = request.getfixturevalue(fixture_name) - d = {obj: "value"} - assert d[obj] == "value" - assert obj in d + d = {sample_object: "value"} + assert d[sample_object] == "value" + assert sample_object in d -@pytest.mark.parametrize("fixture_name", DICT_KEY_TYPES) -def test_usable_in_set(fixture_name, request): +@pytest.mark.parametrize("sample_object", DICT_KEY_TYPES, indirect=True) +def test_usable_in_set(sample_object): """Object can be added to a set.""" - obj = request.getfixturevalue(fixture_name) - s = {obj} - assert obj in s + s = {sample_object} + assert sample_object in s -@pytest.mark.parametrize("fixture_name", WEAKREF_TYPES) -def test_usable_in_weak_value_dict(fixture_name, request): +@pytest.mark.parametrize("sample_object", WEAKREF_TYPES, indirect=True) +def test_usable_in_weak_value_dict(sample_object): """Object can be used as a WeakValueDictionary value.""" - obj = request.getfixturevalue(fixture_name) wvd = weakref.WeakValueDictionary() - wvd["key"] = obj - assert wvd["key"] is obj + wvd["key"] = sample_object + assert wvd["key"] is sample_object -@pytest.mark.parametrize("fixture_name", WEAK_KEY_TYPES) -def test_usable_in_weak_key_dict(fixture_name, request): +@pytest.mark.parametrize("sample_object", WEAK_KEY_TYPES, indirect=True) +def test_usable_in_weak_key_dict(sample_object): """Object can be used as a WeakKeyDictionary key.""" - obj = request.getfixturevalue(fixture_name) wkd = weakref.WeakKeyDictionary() - wkd[obj] = "value" - assert wkd[obj] == "value" + wkd[sample_object] = "value" + assert wkd[sample_object] == "value" -@pytest.mark.parametrize("fixture_name", WEAK_KEY_TYPES) -def test_usable_in_weak_set(fixture_name, request): +@pytest.mark.parametrize("sample_object", WEAK_KEY_TYPES, indirect=True) +def test_usable_in_weak_set(sample_object): """Object can be added to a WeakSet.""" - obj = request.getfixturevalue(fixture_name) ws = weakref.WeakSet() - ws.add(obj) - assert obj in ws + ws.add(sample_object) + assert sample_object in ws # ============================================================================= @@ -850,12 +857,10 @@ def test_usable_in_weak_set(fixture_name, request): # ============================================================================= -@pytest.mark.parametrize("fixture_name,pattern", REPR_PATTERNS) -def test_repr_format(fixture_name, pattern, request): +@pytest.mark.parametrize("sample_object,pattern", REPR_PATTERNS, indirect=["sample_object"]) +def test_repr_format(sample_object, pattern): """repr() returns a properly formatted string.""" - obj = request.getfixturevalue(fixture_name) - result = repr(obj) - assert re.fullmatch(pattern, result) + assert re.fullmatch(pattern, repr(sample_object)) # ============================================================================= @@ -864,10 +869,9 @@ def test_repr_format(fixture_name, pattern, request): @pytest.mark.parametrize("pickle_module", PICKLE_MODULES) -@pytest.mark.parametrize("fixture_name", PICKLE_TYPES) -def test_pickle_roundtrip(fixture_name, pickle_module, request): +@pytest.mark.parametrize("sample_object", PICKLE_TYPES, indirect=True) +def test_pickle_roundtrip(sample_object, pickle_module): """Object survives a pickle/cloudpickle roundtrip.""" mod = pytest.importorskip(pickle_module) - obj = request.getfixturevalue(fixture_name) - result = mod.loads(mod.dumps(obj)) - assert type(result) is type(obj) + result = mod.loads(mod.dumps(sample_object)) + assert type(result) is type(sample_object) diff --git a/cuda_core/tests/test_program_cache.py b/cuda_core/tests/test_program_cache.py index 01a39e0032c..963ec1cc04b 100644 --- a/cuda_core/tests/test_program_cache.py +++ b/cuda_core/tests/test_program_cache.py @@ -1927,6 +1927,7 @@ def test_filestream_cache_tracker_reconciles_after_external_drift(tmp_path): assert cache._tracked_size_bytes <= 1100 # actual on-disk is 'b' + 'c' or just 'c' +@pytest.mark.thread_unsafe(reason="already threaded and patches _file_stream") def test_filestream_cache_tracker_clamps_at_zero_under_delete_race(tmp_path): """Two-thread reproduction of the ``__delitem__`` vs ``_enforce_size_cap`` race. Thread A is mid-delete: it has stat'd the diff --git a/cuda_pathfinder/pyproject.toml b/cuda_pathfinder/pyproject.toml index dfbef9dd18d..32fec0f13d1 100644 --- a/cuda_pathfinder/pyproject.toml +++ b/cuda_pathfinder/pyproject.toml @@ -102,6 +102,7 @@ git_describe_command = [ "git", "describe", "--dirty", "--tags", "--long", "--ma [tool.pytest.ini_options] addopts = "--showlocals" +thread_unsafe_fixtures = ['mocker'] [tool.mypy] # Try to keep the mypy configuration similar between the subprojects diff --git a/cuda_pathfinder/tests/test_find_nvidia_binaries.py b/cuda_pathfinder/tests/test_find_nvidia_binaries.py index ec9740cd853..0f9e5ed31c1 100644 --- a/cuda_pathfinder/tests/test_find_nvidia_binaries.py +++ b/cuda_pathfinder/tests/test_find_nvidia_binaries.py @@ -173,6 +173,7 @@ def test_find_binary_cache_negative_result(monkeypatch, mocker): @pytest.mark.usefixtures("clear_find_binary_cache") +@pytest.mark.thread_unsafe(reason="functools.cache may replace entry.") def test_caching_per_utility(): """Verify that different utilities have independent cache entries.""" nvdisasm1 = find_nvidia_binary_utility("nvdisasm") diff --git a/scripts/run_tests.sh b/scripts/run_tests.sh index 163ff70a997..99d19753693 100755 --- a/scripts/run_tests.sh +++ b/scripts/run_tests.sh @@ -7,7 +7,7 @@ set -euo pipefail # Simple, dependency-free orchestrator to run tests for all packages. # Usage: -# scripts/run_tests.sh [ -v|--verbose ] [ --install | --no-install ] [ --with-cython | --skip-cython ] [ --with-examples | --skip-examples ] [ --with-ptds ] +# scripts/run_tests.sh [ -v|--verbose ] [ --install | --no-install ] [ --with-cython | --skip-cython ] [ --with-examples | --skip-examples ] [ --with-ptds ] [ --parallel-threads=N ] # scripts/run_tests.sh [ flags ] # pathfinder -> bindings -> core # scripts/run_tests.sh [ flags ] core # only core # scripts/run_tests.sh [ flags ] bindings # only bindings @@ -38,6 +38,9 @@ Options: --with-examples Run examples where applicable (e.g., cuda_bindings/examples) --skip-examples Skip running examples (default) --with-ptds Re-run cuda_bindings tests with PTDS (CUDA_PYTHON_CUDA_PER_THREAD_DEFAULT_STREAM=1) + --parallel-threads=N + Run pytest with --parallel-threads=N. Defaults to 4 when + pytest-run-parallel is installed, otherwise 0 (disabled). -h, --help Show this help and exit Examples: @@ -54,6 +57,7 @@ RUN_CYTHON=0 RUN_EXAMPLES=1 RUN_PTDS=1 INSTALL_MODE=auto # auto|force|skip +PARALLEL_THREADS=0 while [[ $# -gt 0 ]]; do case "$1" in -h|--help) @@ -92,6 +96,18 @@ while [[ $# -gt 0 ]]; do RUN_PTDS=1 shift ;; + --parallel-threads=*) + PARALLEL_THREADS="${1#*=}" + shift + ;; + --parallel-threads) + if [[ $# -lt 2 ]]; then + echo "Missing value for --parallel-threads" >&2 + exit 1 + fi + PARALLEL_THREADS="$2" + shift 2 + ;; *) break ;; @@ -100,12 +116,21 @@ done target=${1:-all} +if ! [[ "${PARALLEL_THREADS}" =~ ^[0-9]+$ ]]; then + echo "--parallel-threads must be a non-negative integer, got: ${PARALLEL_THREADS}" >&2 + exit 1 +fi + if [[ ${VERBOSE} -eq 1 ]]; then PYTEST_FLAGS=( -ra -s -v ) else # Very quiet: show failures/errors summary only PYTEST_FLAGS=( -qq ) fi +PYTEST_PARALLEL_FLAGS=() +if [[ "${PARALLEL_THREADS}" -gt 0 ]]; then + PYTEST_PARALLEL_FLAGS=( "--parallel-threads=${PARALLEL_THREADS}" ) +fi declare -A RESULTS ORDERED_RESULTS=() @@ -133,7 +158,7 @@ status_from_rc() { run_pytest() { # Run pytest safely under set -e and return its exit code set +e - CUDA_PYTHON_CUDA_PER_THREAD_DEFAULT_STREAM=0 python -m pytest "${PYTEST_FLAGS[@]}" "$@" + CUDA_PYTHON_CUDA_PER_THREAD_DEFAULT_STREAM=0 python -m pytest "${PYTEST_FLAGS[@]}" "${PYTEST_PARALLEL_FLAGS[@]}" "$@" local rc=$? set -e return ${rc} @@ -142,7 +167,7 @@ run_pytest() { run_pytest_ptds() { # Run pytest with PTDS env set; safely return its exit code set +e - CUDA_PYTHON_CUDA_PER_THREAD_DEFAULT_STREAM=1 python -m pytest "${PYTEST_FLAGS[@]}" "$@" + CUDA_PYTHON_CUDA_PER_THREAD_DEFAULT_STREAM=1 python -m pytest "${PYTEST_FLAGS[@]}" "${PYTEST_PARALLEL_FLAGS[@]}" "$@" local rc=$? set -e return ${rc}