提交 0fa004e1 authored 作者: Michael Osthege's avatar Michael Osthege 提交者: Brandon T. Willard

Replace custom locking mechanism with filelock

Closes #203
上级 a2f9752f
...@@ -87,7 +87,7 @@ def main(): ...@@ -87,7 +87,7 @@ def main():
cache = get_module_cache(init_args=dict(do_refresh=False)) cache = get_module_cache(init_args=dict(do_refresh=False))
cache.clear_old() cache.clear_old()
elif sys.argv[1] == "unlock": elif sys.argv[1] == "unlock":
theano.compile.compilelock.force_unlock() theano.compile.compilelock.force_unlock(config.compiledir)
print("Lock successfully removed!") print("Lock successfully removed!")
elif sys.argv[1] == "purge": elif sys.argv[1] == "purge":
theano.compile.compiledir.compiledir_purge() theano.compile.compiledir.compiledir_purge()
......
...@@ -57,7 +57,7 @@ if __name__ == "__main__": ...@@ -57,7 +57,7 @@ if __name__ == "__main__":
license=LICENSE, license=LICENSE,
platforms=PLATFORMS, platforms=PLATFORMS,
packages=find_packages(exclude=["tests.*"]), packages=find_packages(exclude=["tests.*"]),
install_requires=["numpy>=1.9.1", "scipy>=0.14"], install_requires=["numpy>=1.9.1", "scipy>=0.14", "filelock"],
package_data={ package_data={
"": [ "": [
"*.txt", "*.txt",
......
...@@ -2,14 +2,12 @@ ...@@ -2,14 +2,12 @@
Locking mechanism to ensure no two compilations occur simultaneously Locking mechanism to ensure no two compilations occur simultaneously
in the same compilation directory (which can cause crashes). in the same compilation directory (which can cause crashes).
""" """
import atexit
import logging
import os import os
import socket import threading
import time import typing
from contextlib import contextmanager from contextlib import contextmanager
import numpy as np import filelock
from theano.configdefaults import config from theano.configdefaults import config
...@@ -20,362 +18,61 @@ __all__ = [ ...@@ -20,362 +18,61 @@ __all__ = [
] ]
_random = np.random.RandomState([2015, 8, 2]) local_mem = threading.local()
local_mem._locks: typing.Dict[str, bool] = {}
_logger = logging.getLogger("theano.compile.compilelock")
# If the user provided a logging level, we don't want to override it.
if _logger.level == logging.NOTSET:
# INFO will show the "Refreshing lock" messages
_logger.setLevel(logging.INFO)
def force_unlock(lock_dir: os.PathLike):
def force_unlock(): """Forces the release of the lock on a specific directory.
"""
Delete the compilation lock if someone else has it.
"""
get_lock(min_wait=0, max_wait=0.001, timeout=0)
release_lock()
@contextmanager
def lock_ctx(lock_dir=None, **kw):
get_lock(lock_dir=lock_dir, **kw)
yield
release_lock()
# We define this name with an underscore so that python shutdown
# deletes this before non-underscore names (like os). We need to do
# it this way to avoid errors on shutdown.
def _get_lock(lock_dir=None, **kw):
"""
Obtain lock on compilation directory.
Parameters Parameters
---------- ----------
kw lock_dir : os.PathLike
Additional arguments to be forwarded to the `lock` function when Path to a directory that was locked with `lock_ctx`.
acquiring the lock.
Notes
-----
We can lock only on 1 directory at a time.
"""
if lock_dir is None:
lock_dir = os.path.join(config.compiledir, "lock_dir")
if not hasattr(get_lock, "n_lock"):
# Initialization.
get_lock.n_lock = 0
get_lock.lock_dir = lock_dir
get_lock.unlocker = Unlocker(get_lock.lock_dir)
else:
if lock_dir != get_lock.lock_dir:
# Compilation directory has changed.
# First ensure all old locks were released.
assert get_lock.n_lock == 0
# Update members for new compilation directory.
get_lock.lock_dir = lock_dir
get_lock.unlocker = Unlocker(get_lock.lock_dir)
# Only really try to acquire the lock if we do not have it already.
if get_lock.n_lock == 0:
lock(get_lock.lock_dir, **kw)
atexit.register(Unlocker.unlock, get_lock.unlocker)
# Store time at which the lock was set.
get_lock.start_time = time.time()
else:
# Check whether we need to 'refresh' the lock. We do this
# every 'config.compile__timeout / 2' seconds to ensure
# no one else tries to override our lock after their
# 'config.compile__timeout' timeout period.
if get_lock.start_time is None:
# This should not happen. So if this happen, clean up
# the lock state and raise an error.
while get_lock.n_lock > 0:
release_lock()
raise Exception(
"For some unknow reason, the lock was already "
"taken, but no start time was registered."
)
now = time.time()
if now - get_lock.start_time > config.compile__timeout / 2:
lockpath = os.path.join(get_lock.lock_dir, "lock")
_logger.info(f"Refreshing lock {lockpath}")
refresh_lock(lockpath)
get_lock.start_time = now
get_lock.n_lock += 1
get_lock = _get_lock
def release_lock():
""" """
Release lock on compilation directory.
"""
get_lock.n_lock -= 1
assert get_lock.n_lock >= 0
# Only really release lock once all lock requests have ended.
if get_lock.n_lock == 0:
get_lock.start_time = None
get_lock.unlocker.unlock(force=False)
fl = filelock.FileLock(os.path.join(lock_dir, ".lock"))
fl.release(force=True)
# This is because None is a valid input for timeout dir_key = f"{lock_dir}-{os.getpid()}"
notset = object()
if dir_key in local_mem._locks:
del local_mem._locks[dir_key]
def lock(tmp_dir, timeout=notset, min_wait=None, max_wait=None, verbosity=1):
"""
Obtain lock access by creating a given temporary directory (whose base will
be created if needed, but will not be deleted after the lock is removed).
If access is refused by the same lock owner during more than 'timeout'
seconds, then the current lock is overridden. If timeout is None, then no
timeout is performed.
The lock is performed by creating a 'lock' file in 'tmp_dir' that contains
a unique id identifying the owner of the lock (the process id, followed by
a random string).
When there is already a lock, the process sleeps for a random amount of @contextmanager
time between min_wait and max_wait seconds before trying again. def lock_ctx(lock_dir: os.PathLike = None, *, timeout: typing.Optional[float] = -1):
"""Context manager that wraps around FileLock and SoftFileLock from filelock package.
If 'verbosity' is >= 1, then a message will be displayed when we need to
wait for the lock. If it is set to a value >1, then this message will be
displayed each time we re-check for the presence of the lock. Otherwise it
is displayed only when we notice the lock's owner has changed.
Parameters Parameters
---------- ----------
tmp_dir : str lock_dir : str
Lock directory that will be created when acquiring the lock. A directory for which to acquire the lock.
timeout : int or None Defaults to the config.compiledir.
Time (in seconds) to wait before replacing an existing lock (default timeout : float
config 'compile__timeout'). Timeout in seconds for waiting in lock acquisition.
min_wait: int Defaults to config.compile__timeout.
Minimum time (in seconds) to wait before trying again to get the lock
(default config 'compile__wait').
max_wait: int
Maximum time (in seconds) to wait before trying again to get the lock
(default 2 * min_wait).
verbosity : int
Amount of feedback displayed to screen (default 1).
""" """
if min_wait is None: if lock_dir is None:
min_wait = config.compile__wait lock_dir = config.compiledir
if max_wait is None: if timeout == -1:
max_wait = min_wait * 2
if timeout is notset:
timeout = config.compile__timeout timeout = config.compile__timeout
# Create base of lock directory if required. elif timeout is not None or timeout <= 0:
base_lock = os.path.dirname(tmp_dir) raise ValueError(f"Timeout parameter must be None or positive. Got {timeout}.")
if not os.path.isdir(base_lock):
try:
os.makedirs(base_lock)
except OSError:
# Someone else was probably trying to create it at the same time.
# We wait two seconds just to make sure the following assert does
# not fail on some NFS systems.
time.sleep(2)
assert os.path.isdir(base_lock)
# Variable initialization. # locks are kept in a dictionary to account for changing compiledirs
lock_file = os.path.join(tmp_dir, "lock") dir_key = f"{lock_dir}-{os.getpid()}"
hostname = socket.gethostname()
my_pid = os.getpid()
no_display = verbosity == 0
nb_error = 0 if dir_key not in local_mem._locks:
# The number of time we sleep when their is no errors. local_mem._locks[dir_key] = True
# Used to don't display it the first time to display it less frequently. fl = filelock.FileLock(os.path.join(lock_dir, ".lock"))
# And so don't get as much email about this! fl.acquire(timeout=timeout)
nb_wait = 0
# Acquire lock.
while True:
try: try:
last_owner = "no_owner" yield
time_start = time.time() finally:
other_dead = False if fl.is_locked:
while os.path.isdir(tmp_dir): fl.release()
try: if dir_key in local_mem._locks:
with open(lock_file) as f: del local_mem._locks[dir_key]
read_owner = f.readlines()[0].strip() else:
yield
# The try is transition code for old locks.
# It may be removed when people have upgraded.
try:
other_host = read_owner.split("_")[2]
except IndexError:
other_host = () # make sure it isn't equal to any host
if other_host == hostname:
try:
# Just check if the other process still exist.
os.kill(int(read_owner.split("_")[0]), 0)
except OSError:
other_dead = True
except AttributeError:
pass # os.kill does not exist on windows
except Exception:
read_owner = "failure"
if other_dead:
if not no_display:
msg = f"process '{read_owner.split('_')[0]}'"
_logger.warning(
f"Overriding existing lock by dead {msg} "
f"(I am process '{my_pid}')",
)
get_lock.unlocker.unlock(force=True)
continue
if last_owner == read_owner:
if timeout is not None and time.time() - time_start >= timeout:
# Timeout exceeded or locking process dead.
if not no_display:
if read_owner == "failure":
msg = "unknown process"
else:
msg = f"process '{read_owner.split('_')[0]}'"
_logger.warning(
f"Overriding existing lock by {msg} "
f"(I am process '{my_pid}')",
)
get_lock.unlocker.unlock(force=True)
continue
else:
last_owner = read_owner
time_start = time.time()
no_display = verbosity == 0
if not no_display and nb_wait > 0:
if read_owner == "failure":
msg = "unknown process"
else:
msg = f"process '{read_owner.split('_')[0]}'"
_logger.info(
f"Waiting for existing lock by {msg} (I am "
f"process '{my_pid}')",
)
_logger.info(f"To manually release the lock, delete {tmp_dir}")
if verbosity <= 1:
no_display = True
nb_wait += 1
time.sleep(_random.uniform(min_wait, max_wait))
try:
os.mkdir(tmp_dir)
except FileExistsError:
# Error while creating the directory: someone else
# must have tried at the exact same time.
nb_error += 1
if nb_error < 10:
continue
else:
raise
# Safety check: the directory should be here.
assert os.path.isdir(tmp_dir)
# Write own id into lock file.
unique_id = refresh_lock(lock_file)
# Verify we are really the lock owner (this should not be needed,
# but better be safe than sorry).
with open(lock_file) as f:
owner = f.readlines()[0].strip()
if owner != unique_id:
# Too bad, try again.
continue
else:
# We got the lock, hoorray!
return
except Exception as e:
# If something wrong happened, we try again.
_logger.warning(f"Something wrong happened: {type(e)} {e}")
nb_error += 1
if nb_error > 10:
raise
time.sleep(_random.uniform(min_wait, max_wait))
continue
def refresh_lock(lock_file):
"""
'Refresh' an existing lock by re-writing the file containing the owner's
unique id, using a new (randomly generated) id, which is also returned.
"""
unique_id = "{}_{}_{}".format(
os.getpid(),
"".join([str(_random.randint(0, 9)) for i in range(10)]),
socket.gethostname(),
)
try:
with open(lock_file, "w") as lock_write:
lock_write.write(unique_id + "\n")
except Exception:
# In some strange case, this happen. To prevent all tests
# from failing, we release the lock, but as there is a
# problem, we still keep the original exception.
# This way, only 1 test would fail.
while get_lock.n_lock > 0:
release_lock()
_logger.warning(
"Refreshing lock failed, we release the"
" lock before raising again the exception"
)
raise
return unique_id
class Unlocker:
"""
Class wrapper around release mechanism so that the lock is automatically
released when the program exits (even when crashing or being interrupted),
using the __del__ class method.
"""
def __init__(self, tmp_dir):
self.tmp_dir = tmp_dir
def unlock(self, force=False):
"""
Remove current lock.
This function does not crash if it is unable to properly
delete the lock file and directory. The reason is that it
should be allowed for multiple jobs running in parallel to
unlock the same directory at the same time (e.g. when reaching
their timeout limit).
"""
# If any error occurs, we assume this is because someone else tried to
# unlock this directory at the same time.
# Note that it is important not to have both remove statements within
# the same try/except block. The reason is that while the attempt to
# remove the file may fail (e.g. because for some reason this file does
# not exist), we still want to try and remove the directory.
# Check if someone else didn't took our lock.
lock_file = os.path.join(self.tmp_dir, "lock")
if not force:
try:
with open(lock_file) as f:
owner = f.readlines()[0].strip()
pid, _, hname = owner.split("_")
if pid != str(os.getpid()) or hname != socket.gethostname():
return
except Exception:
pass
try:
os.remove(lock_file)
except Exception:
pass
try:
os.rmdir(self.tmp_dir)
except Exception:
pass
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论