From 4f6e83169bb571b2744e0bfa6f1629f5e199f44c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kai=20M=C3=BChlbauer?= Date: Wed, 3 Jan 2024 12:56:24 +0100 Subject: [PATCH] move dask code into xarray/backends/locks.py --- xarray/backends/dask_lock.py | 107 ------------------------------- xarray/backends/locks.py | 79 ++++++++++++++++++++++- xarray/tests/test_distributed.py | 3 +- 3 files changed, 77 insertions(+), 112 deletions(-) delete mode 100644 xarray/backends/dask_lock.py diff --git a/xarray/backends/dask_lock.py b/xarray/backends/dask_lock.py deleted file mode 100644 index 1541a789648..00000000000 --- a/xarray/backends/dask_lock.py +++ /dev/null @@ -1,107 +0,0 @@ -# The code in this module is taken from dask. For reference, -# here is a copy of the dask copyright notice: - -# BSD 3-Clause License -# -# Copyright (c) 2014, Anaconda, Inc. and contributors -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# * Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# * Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# * Neither the name of the copyright holder nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -from __future__ import annotations - -import uuid -from collections.abc import Hashable -from threading import Lock -from typing import ClassVar -from weakref import WeakValueDictionary - - -class SerializableLock: - """A Serializable per-process Lock - - This wraps a normal ``threading.Lock`` object and satisfies the same - interface. However, this lock can also be serialized and sent to different - processes. It will not block concurrent operations between processes (for - this you should look at ``dask.multiprocessing.Lock`` or ``locket.lock_file`` - but will consistently deserialize into the same lock. - - So if we make a lock in one process:: - - lock = SerializableLock() - - And then send it over to another process multiple times:: - - bytes = pickle.dumps(lock) - a = pickle.loads(bytes) - b = pickle.loads(bytes) - - Then the deserialized objects will operate as though they were the same - lock, and collide as appropriate. - - This is useful for consistently protecting resources on a per-process - level. - - The creation of locks is itself not threadsafe. - """ - - _locks: ClassVar[WeakValueDictionary[Hashable, Lock]] = WeakValueDictionary() - token: Hashable - lock: Lock - - def __init__(self, token: Hashable | None = None): - self.token = token or str(uuid.uuid4()) - if self.token in SerializableLock._locks: - self.lock = SerializableLock._locks[self.token] - else: - self.lock = Lock() - SerializableLock._locks[self.token] = self.lock - - def acquire(self, *args, **kwargs): - return self.lock.acquire(*args, **kwargs) - - def release(self, *args, **kwargs): - return self.lock.release(*args, **kwargs) - - def __enter__(self): - self.lock.__enter__() - - def __exit__(self, *args): - self.lock.__exit__(*args) - - def locked(self): - return self.lock.locked() - - def __getstate__(self): - return self.token - - def __setstate__(self, token): - self.__init__(token) - - def __str__(self): - return f"<{self.__class__.__name__}: {self.token}>" - - __repr__ = __str__ diff --git a/xarray/backends/locks.py b/xarray/backends/locks.py index 358dc91ffb2..045ee522fa8 100644 --- a/xarray/backends/locks.py +++ b/xarray/backends/locks.py @@ -2,11 +2,84 @@ import multiprocessing import threading +import uuid import weakref -from collections.abc import MutableMapping -from typing import Any +from collections.abc import Hashable, MutableMapping +from typing import Any, ClassVar +from weakref import WeakValueDictionary + + +# SerializableLock is adapted from Dask: +# https://github.com/dask/dask/blob/74e898f0ec712e8317ba86cc3b9d18b6b9922be0/dask/utils.py#L1160-L1224 +# Used under the terms of Dask's license, see licenses/DASK_LICENSE. +class SerializableLock: + """A Serializable per-process Lock + + This wraps a normal ``threading.Lock`` object and satisfies the same + interface. However, this lock can also be serialized and sent to different + processes. It will not block concurrent operations between processes (for + this you should look at ``dask.multiprocessing.Lock`` or ``locket.lock_file`` + but will consistently deserialize into the same lock. + + So if we make a lock in one process:: + + lock = SerializableLock() + + And then send it over to another process multiple times:: + + bytes = pickle.dumps(lock) + a = pickle.loads(bytes) + b = pickle.loads(bytes) + + Then the deserialized objects will operate as though they were the same + lock, and collide as appropriate. + + This is useful for consistently protecting resources on a per-process + level. + + The creation of locks is itself not threadsafe. + """ + + _locks: ClassVar[ + WeakValueDictionary[Hashable, threading.Lock] + ] = WeakValueDictionary() + token: Hashable + lock: threading.Lock + + def __init__(self, token: Hashable | None = None): + self.token = token or str(uuid.uuid4()) + if self.token in SerializableLock._locks: + self.lock = SerializableLock._locks[self.token] + else: + self.lock = threading.Lock() + SerializableLock._locks[self.token] = self.lock + + def acquire(self, *args, **kwargs): + return self.lock.acquire(*args, **kwargs) + + def release(self, *args, **kwargs): + return self.lock.release(*args, **kwargs) + + def __enter__(self): + self.lock.__enter__() + + def __exit__(self, *args): + self.lock.__exit__(*args) + + def locked(self): + return self.lock.locked() + + def __getstate__(self): + return self.token + + def __setstate__(self, token): + self.__init__(token) + + def __str__(self): + return f"<{self.__class__.__name__}: {self.token}>" + + __repr__ = __str__ -from xarray.backends.dask_lock import SerializableLock # Locks used by multiple backends. # Neither HDF5 nor the netCDF-C library are thread-safe. diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index af801966616..aa53bcf329b 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -27,8 +27,7 @@ ) import xarray as xr -from xarray.backends.dask_lock import SerializableLock -from xarray.backends.locks import HDF5_LOCK, CombinedLock +from xarray.backends.locks import HDF5_LOCK, CombinedLock, SerializableLock from xarray.tests import ( assert_allclose, assert_identical,