提交 d1222d9c authored 作者: Arnaud Bergeron's avatar Arnaud Bergeron

Cleanup refresh to make the cleanup optional and avoid taking the lock

if we don't delete anything.
上级 57bea857
...@@ -627,8 +627,7 @@ class ModuleCache(object): ...@@ -627,8 +627,7 @@ class ModuleCache(object):
self.stats[0] += 1 self.stats[0] += 1
return self.module_from_name[name] return self.module_from_name[name]
def refresh(self, age_thresh_use=None, delete_if_problem=False, def refresh(self, age_thresh_use=None, delete_if_problem=False, cleanup=True):
no_clean_empty=False):
"""Update cache data by walking the cache directory structure. """Update cache data by walking the cache directory structure.
Load key.pkl files that have not been loaded yet. Load key.pkl files that have not been loaded yet.
...@@ -638,12 +637,15 @@ class ModuleCache(object): ...@@ -638,12 +637,15 @@ class ModuleCache(object):
:param age_thresh_use: Do not use modules olther than this. :param age_thresh_use: Do not use modules olther than this.
Defaults to self.age_thresh_use. Defaults to self.age_thresh_use.
:param delete_if_problem: If True, cache entries that meet one of those :param delete_if_problem: If True, cache entries that meet one
two conditions are deleted: of those two conditions are deleted:
- Those for which unpickling the KeyData file fails with an - Those for which unpickling the KeyData file fails with
unknown exception. an unknown exception.
- Duplicated modules, regardless of their age. - Duplicated modules, regardless of their age.
:param cleanup: Do a cleanup of the cache removing expired and
broken modules.
:returns: a list of modules of age higher than age_thresh_use. :returns: a list of modules of age higher than age_thresh_use.
""" """
if age_thresh_use is None: if age_thresh_use is None:
...@@ -651,238 +653,244 @@ class ModuleCache(object): ...@@ -651,238 +653,244 @@ class ModuleCache(object):
start_time = time.time() start_time = time.time()
too_old_to_use = [] too_old_to_use = []
compilelock.get_lock() to_delete = []
try: def rmtree(*args, **kwargs):
# add entries that are not in the entry_from_key dictionary if cleanup:
time_now = time.time() to_delete.append((args, kwargs))
# Go through directories in alphabetical order to ensure consistent
# behavior. # add entries that are not in the entry_from_key dictionary
subdirs = sorted(os.listdir(self.dirname)) time_now = time.time()
for root in subdirs: # Go through directories in alphabetical order to ensure consistent
root = os.path.join(self.dirname, root) # behavior.
key_pkl = os.path.join(root, 'key.pkl') subdirs = sorted(os.listdir(self.dirname))
if key_pkl in self.loaded_key_pkl: for root in subdirs:
continue root = os.path.join(self.dirname, root)
if not os.path.isdir(root): key_pkl = os.path.join(root, 'key.pkl')
continue if key_pkl in self.loaded_key_pkl:
files = os.listdir(root) continue
if (not files and not no_clean_empty) or 'delete.me' in files: if not os.path.isdir(root):
_rmtree(root, ignore_nocleanup=True, continue
msg="delete.me found in dir") files = os.listdir(root)
continue if not files or 'delete.me' in files:
elif 'key.pkl' in files: rmtree(root, ignore_nocleanup=True,
msg="delete.me found in dir")
continue
elif 'key.pkl' in files:
try:
entry = module_name_from_dir(root, files=files)
except ValueError: # there is a key but no dll!
if not root.startswith("/tmp"):
# Under /tmp, file are removed periodically by the
# os. So it is normal that this happens from time
# to time.
_logger.warning("ModuleCache.refresh() Found key "
"without dll in cache, deleting it. %s",
key_pkl)
rmtree(root, ignore_nocleanup=True,
msg="missing module file", level=logging.INFO)
continue
if (time_now - last_access_time(entry)) < age_thresh_use:
_logger.debug('refresh adding %s', key_pkl)
def unpickle_failure():
_logger.info("ModuleCache.refresh() Failed to "
"unpickle cache file %s", key_pkl)
try: try:
entry = module_name_from_dir(root, files=files) with open(key_pkl, 'rb') as f:
except ValueError: # there is a key but no dll! key_data = cPickle.load(f)
if not root.startswith("/tmp"): except EOFError:
# Under /tmp, file are removed periodically by the # Happened once... not sure why (would be worth
# os. So it is normal that this happens from time # investigating if it ever happens again).
# to time. unpickle_failure()
_logger.warning("ModuleCache.refresh() Found key " rmtree(root, ignore_nocleanup=True,
"without dll in cache, deleting it. %s", msg='broken cache directory [EOF]',
key_pkl) level=logging.WARNING)
_rmtree(root, ignore_nocleanup=True, continue
msg="missing module file", level=logging.INFO) except ValueError:
# This can happen when we have bad config value
# in the cuda.nvcc_compiler.py file.
# We should not hide it here, as this will cause
# an unrelated error to appear.
raise
except Exception:
unpickle_failure()
if delete_if_problem:
rmtree(root, ignore_nocleanup=True,
msg='broken cache directory',
level=logging.INFO)
else:
# This exception is often triggered by keys
# that contain references to classes that have
# not yet been imported (e.g. when running two
# different Theano-based scripts). They are not
# necessarily broken, but we cannot load them
# here.
pass
continue continue
if (time_now - last_access_time(entry)) < age_thresh_use:
_logger.debug('refresh adding %s', key_pkl)
def unpickle_failure():
_logger.info("ModuleCache.refresh() Failed to "
"unpickle cache file %s", key_pkl)
try:
with open(key_pkl, 'rb') as f:
key_data = cPickle.load(f)
except EOFError:
# Happened once... not sure why (would be worth
# investigating if it ever happens again).
unpickle_failure()
_rmtree(root, ignore_nocleanup=True,
msg='broken cache directory [EOF]',
level=logging.WARNING)
continue
except ValueError:
# This can happen when we have bad config value
# in the cuda.nvcc_compiler.py file.
# We should not hide it here, as this will cause
# an unrelated error to appear.
raise
except Exception:
unpickle_failure()
if delete_if_problem:
_rmtree(root, ignore_nocleanup=True,
msg='broken cache directory',
level=logging.INFO)
else:
# This exception is often triggered by keys
# that contain references to classes that have
# not yet been imported (e.g. when running two
# different Theano-based scripts). They are not
# necessarily broken, but we cannot load them
# here.
pass
continue
if not isinstance(key_data, KeyData): if not isinstance(key_data, KeyData):
# This is some old cache data, that does not fit # This is some old cache data, that does not fit
# the new cache format. It would be possible to # the new cache format. It would be possible to
# update it, but it is not entirely safe since we # update it, but it is not entirely safe since we
# do not know the config options that were used. # do not know the config options that were used.
# As a result, we delete it instead (which is also # As a result, we delete it instead (which is also
# simpler to implement). # simpler to implement).
_rmtree(root, ignore_nocleanup=True, rmtree(root, ignore_nocleanup=True,
msg=( msg=(
'invalid cache entry format -- this ' 'invalid cache entry format -- this '
'should not happen unless your cache ' 'should not happen unless your cache '
'was really old'), 'was really old'),
level=logging.WARN) level=logging.WARN)
continue continue
# Check the path to the module stored in the KeyData # Check the path to the module stored in the KeyData
# object matches the path to `entry`. There may be # object matches the path to `entry`. There may be
# a mismatch e.g. due to symlinks, or some directory # a mismatch e.g. due to symlinks, or some directory
# being renamed since last time cache was created. # being renamed since last time cache was created.
kd_entry = key_data.get_entry() kd_entry = key_data.get_entry()
if kd_entry != entry: if kd_entry != entry:
if is_same_entry(entry, kd_entry): if is_same_entry(entry, kd_entry):
# Update KeyData object. Note that we also need # Update KeyData object. Note that we also need
# to update the key_pkl field, because it is # to update the key_pkl field, because it is
# likely to be incorrect if the entry itself # likely to be incorrect if the entry itself
# was wrong. # was wrong.
key_data.entry = entry key_data.entry = entry
key_data.key_pkl = key_pkl key_data.key_pkl = key_pkl
else: else:
# This is suspicious. Better get rid of it. # This is suspicious. Better get rid of it.
_rmtree(root, ignore_nocleanup=True, rmtree(root, ignore_nocleanup=True,
msg='module file path mismatch', msg='module file path mismatch',
level=logging.INFO) level=logging.INFO)
continue continue
# Find unversioned keys from other processes. # Find unversioned keys from other processes.
# TODO: check if this can happen at all # TODO: check if this can happen at all
to_del = [key for key in key_data.keys if not key[0]] to_del = [key for key in key_data.keys if not key[0]]
if to_del: if to_del:
_logger.warning(
"ModuleCache.refresh() Found unversioned "
"key in cache, removing it. %s", key_pkl)
# Since the version is in the module hash, all
# keys should be unversioned.
if len(to_del) != len(key_data.keys):
_logger.warning( _logger.warning(
"ModuleCache.refresh() Found unversioned " 'Found a mix of unversioned and '
"key in cache, removing it. %s", key_pkl) 'versioned keys for the same '
# Since the version is in the module hash, all 'module %s', key_pkl)
# keys should be unversioned. rmtree(root, ignore_nocleanup=True,
if len(to_del) != len(key_data.keys): msg="unversioned key(s) in cache",
_logger.warning( level=logging.INFO)
'Found a mix of unversioned and ' continue
'versioned keys for the same '
'module %s', key_pkl)
_rmtree(root, ignore_nocleanup=True,
msg="unversioned key(s) in cache",
level=logging.INFO)
continue
mod_hash = key_data.module_hash mod_hash = key_data.module_hash
if mod_hash in self.module_hash_to_key_data: if mod_hash in self.module_hash_to_key_data:
# This may happen when two processes running # This may happen when two processes running
# simultaneously compiled the same module, one # simultaneously compiled the same module, one
# after the other. We delete one once it is old # after the other. We delete one once it is old
# enough (to be confident there is no other process # enough (to be confident there is no other process
# using it), or if `delete_if_problem` is True. # using it), or if `delete_if_problem` is True.
# Note that it is important to walk through # Note that it is important to walk through
# directories in alphabetical order so as to make # directories in alphabetical order so as to make
# sure all new processes only use the first one. # sure all new processes only use the first one.
if cleanup:
age = time.time() - last_access_time(entry) age = time.time() - last_access_time(entry)
if delete_if_problem or age > self.age_thresh_del: if delete_if_problem or age > self.age_thresh_del:
_rmtree(root, ignore_nocleanup=True, rmtree(root, ignore_nocleanup=True,
msg='duplicated module', msg='duplicated module',
level=logging.DEBUG) level=logging.DEBUG)
else: else:
_logger.debug('Found duplicated module not ' _logger.debug('Found duplicated module not '
'old enough yet to be deleted ' 'old enough yet to be deleted '
'(age: %s): %s', '(age: %s): %s',
age, entry) age, entry)
continue continue
# Remember the map from a module's hash to the KeyData # Remember the map from a module's hash to the KeyData
# object associated with it. # object associated with it.
self.module_hash_to_key_data[mod_hash] = key_data self.module_hash_to_key_data[mod_hash] = key_data
for key in key_data.keys: for key in key_data.keys:
if key not in self.entry_from_key: if key not in self.entry_from_key:
self.entry_from_key[key] = entry self.entry_from_key[key] = entry
# Assert that we have not already got this # Assert that we have not already got this
# entry somehow. # entry somehow.
assert entry not in self.module_from_name assert entry not in self.module_from_name
# Store safe part of versioned keys. # Store safe part of versioned keys.
if key[0]: if key[0]:
self.similar_keys.setdefault( self.similar_keys.setdefault(
get_safe_part(key), get_safe_part(key),
[]).append(key) []).append(key)
else: else:
_logger.warning( _logger.warning(
"The same cache key is associated to " "The same cache key is associated to "
"different modules (%s and %s). This " "different modules (%s and %s). This "
"is not supposed to happen! You may " "is not supposed to happen! You may "
"need to manually delete your cache " "need to manually delete your cache "
"directory to fix this.", "directory to fix this.",
self.entry_from_key[key], self.entry_from_key[key],
entry)
# Clean up the name space to prevent bug.
if key_data.keys:
del key
self.loaded_key_pkl.add(key_pkl)
else:
too_old_to_use.append(entry)
# If the compilation failed, no key.pkl is in that
# directory, but a mod.* should be there.
# We do nothing here.
# Clean up the name space to prevent bug.
del root, files, subdirs
# Remove entries that are not in the filesystem.
items_copy = list(self.module_hash_to_key_data.iteritems())
for module_hash, key_data in items_copy:
entry = key_data.get_entry()
try:
# Test to see that the file is [present and] readable.
open(entry).close()
gone = False
except IOError:
gone = True
if gone:
# Assert that we did not have one of the deleted files
# loaded up and in use.
# If so, it should not have been deleted. This should be
# considered a failure of the OTHER process, that deleted
# it.
if entry in self.module_from_name:
_logger.warning("A module that was loaded by this "
"ModuleCache can no longer be read from file "
"%s... this could lead to problems.",
entry) entry)
del self.module_from_name[entry] # Clean up the name space to prevent bug.
if key_data.keys:
_logger.info("deleting ModuleCache entry %s", entry) del key
key_data.delete_keys_from(self.entry_from_key) self.loaded_key_pkl.add(key_pkl)
del self.module_hash_to_key_data[module_hash] else:
if key_data.keys and list(key_data.keys)[0][0]: too_old_to_use.append(entry)
# this is a versioned entry, so should have been on
# disk. Something weird happened to cause this, so we # If the compilation failed, no key.pkl is in that
# are responding by printing a warning, removing # directory, but a mod.* should be there.
# evidence that we ever saw this mystery key. # We do nothing here.
pkl_file_to_remove = key_data.key_pkl
if not key_data.key_pkl.startswith("/tmp"): # Clean up the name space to prevent bug.
# Under /tmp, file are removed periodically by the del root, files, subdirs
# os. So it is normal that this happen from time to
# time. # Remove entries that are not in the filesystem.
_logger.warning("Removing key file %s because the " items_copy = list(self.module_hash_to_key_data.iteritems())
"corresponding module is gone from the " for module_hash, key_data in items_copy:
"file system.", entry = key_data.get_entry()
pkl_file_to_remove) try:
self.loaded_key_pkl.remove(pkl_file_to_remove) # Test to see that the file is [present and] readable.
open(entry).close()
finally: gone = False
compilelock.release_lock() except IOError:
gone = True
if gone:
# Assert that we did not have one of the deleted files
# loaded up and in use.
# If so, it should not have been deleted. This should be
# considered a failure of the OTHER process, that deleted
# it.
if entry in self.module_from_name:
_logger.warning("A module that was loaded by this "
"ModuleCache can no longer be read from file "
"%s... this could lead to problems.",
entry)
del self.module_from_name[entry]
_logger.info("deleting ModuleCache entry %s", entry)
key_data.delete_keys_from(self.entry_from_key)
del self.module_hash_to_key_data[module_hash]
if key_data.keys and list(key_data.keys)[0][0]:
# this is a versioned entry, so should have been on
# disk. Something weird happened to cause this, so we
# are responding by printing a warning, removing
# evidence that we ever saw this mystery key.
pkl_file_to_remove = key_data.key_pkl
if not key_data.key_pkl.startswith("/tmp"):
# Under /tmp, file are removed periodically by the
# os. So it is normal that this happen from time to
# time.
_logger.warning("Removing key file %s because the "
"corresponding module is gone from the "
"file system.",
pkl_file_to_remove)
self.loaded_key_pkl.remove(pkl_file_to_remove)
if to_delete:
with compilelock.lock_ctx():
for a, kw in to_delete:
_rmtree(*a, **kw)
_logger.debug('Time needed to refresh cache: %s', _logger.debug('Time needed to refresh cache: %s',
(time.time() - start_time)) (time.time() - start_time))
...@@ -919,18 +927,13 @@ class ModuleCache(object): ...@@ -919,18 +927,13 @@ class ModuleCache(object):
"previous one") "previous one")
key_data = self.module_hash_to_key_data[module_hash] key_data = self.module_hash_to_key_data[module_hash]
module = self._get_from_key(None, key_data) module = self._get_from_key(None, key_data)
lock_taken = False with compilelock.lock_ctx(keep_lock=keep_lock):
try: try:
compilelock.get_lock() key_data.add_key(key, save_pkl=bool(key[0]))
lock_taken = True key_broken = False
key_data.add_key(key, save_pkl=bool(key[0])) except cPickle.PicklingError:
key_broken = False key_data.remove_key(key)
except cPickle.PicklingError: key_broken = True
key_data.remove_key(key)
key_broken = True
finally:
if lock_taken and not keep_lock:
compilelock.release_lock()
if (key[0] and not key_broken and if (key[0] and not key_broken and
self.check_for_broken_eq): self.check_for_broken_eq):
self.check_key(key, key_data.key_pkl) self.check_key(key, key_data.key_pkl)
...@@ -1026,16 +1029,11 @@ class ModuleCache(object): ...@@ -1026,16 +1029,11 @@ class ModuleCache(object):
if module is not None: if module is not None:
return module return module
try: with compilelock.lock_ctx(keep_lock=keep_lock):
# Compile the module since it's not cached
compilelock.get_lock()
lock_taken = True
# Maybe somebody else compiled it for us while we # Maybe somebody else compiled it for us while we
# where waiting for the lock. Try to load it again # where waiting for the lock. Try to load it again
self.refresh(cleanup=False)
# Need no_clean_empty otherwise it deletes the workdir we
# created above.
self.refresh()
module = self._get_from_key(key) module = self._get_from_key(key)
if module is not None: if module is not None:
return module return module
...@@ -1073,10 +1071,6 @@ class ModuleCache(object): ...@@ -1073,10 +1071,6 @@ class ModuleCache(object):
key_data = self._add_to_cache(module, key, module_hash) key_data = self._add_to_cache(module, key, module_hash)
self.module_hash_to_key_data[module_hash] = key_data self.module_hash_to_key_data[module_hash] = key_data
finally:
# Release lock if needed.
if not keep_lock and lock_taken:
compilelock.release_lock()
self.stats[2] += 1 self.stats[2] += 1
return module return module
...@@ -1159,8 +1153,7 @@ class ModuleCache(object): ...@@ -1159,8 +1153,7 @@ class ModuleCache(object):
else: else:
age_thresh_use = None age_thresh_use = None
compilelock.get_lock() with compilelock.lock_ctx():
try:
# Update the age of modules that have been accessed by other # Update the age of modules that have been accessed by other
# processes and get all module that are too old to use # processes and get all module that are too old to use
# (not loaded in self.entry_from_key). # (not loaded in self.entry_from_key).
...@@ -1179,9 +1172,6 @@ class ModuleCache(object): ...@@ -1179,9 +1172,6 @@ class ModuleCache(object):
_rmtree(parent, msg='old cache directory', level=logging.INFO, _rmtree(parent, msg='old cache directory', level=logging.INFO,
ignore_nocleanup=True) ignore_nocleanup=True)
finally:
compilelock.release_lock()
def clear(self, unversioned_min_age=None, clear_base_files=False, def clear(self, unversioned_min_age=None, clear_base_files=False,
delete_if_problem=False): delete_if_problem=False):
""" """
...@@ -1198,16 +1188,13 @@ class ModuleCache(object): ...@@ -1198,16 +1188,13 @@ class ModuleCache(object):
:param delete_if_problem: See help of refresh() method. :param delete_if_problem: See help of refresh() method.
""" """
compilelock.get_lock() with compilelock.lock_ctx():
try:
self.clear_old( self.clear_old(
age_thresh_del=-1.0, age_thresh_del=-1.0,
delete_if_problem=delete_if_problem) delete_if_problem=delete_if_problem)
self.clear_unversioned(min_age=unversioned_min_age) self.clear_unversioned(min_age=unversioned_min_age)
if clear_base_files: if clear_base_files:
self.clear_base_files() self.clear_base_files()
finally:
compilelock.release_lock()
def clear_base_files(self): def clear_base_files(self):
""" """
...@@ -1219,8 +1206,7 @@ class ModuleCache(object): ...@@ -1219,8 +1206,7 @@ class ModuleCache(object):
rename them with the '.delete.me' extension, to mark them to be deleted rename them with the '.delete.me' extension, to mark them to be deleted
next time we clear the cache. next time we clear the cache.
""" """
compilelock.get_lock() with compilelock.lock_ctx()
try:
for base_dir in ('cuda_ndarray', 'cutils_ext', 'lazylinker_ext', for base_dir in ('cuda_ndarray', 'cutils_ext', 'lazylinker_ext',
'scan_perform'): 'scan_perform'):
to_delete = os.path.join(self.dirname, base_dir + '.delete.me') to_delete = os.path.join(self.dirname, base_dir + '.delete.me')
...@@ -1238,8 +1224,6 @@ class ModuleCache(object): ...@@ -1238,8 +1224,6 @@ class ModuleCache(object):
except Exception: except Exception:
_logger.warning('Could not move %s to %s', _logger.warning('Could not move %s to %s',
to_rename, to_delete) to_rename, to_delete)
finally:
compilelock.release_lock()
def clear_unversioned(self, min_age=None): def clear_unversioned(self, min_age=None):
""" """
...@@ -1254,9 +1238,8 @@ class ModuleCache(object): ...@@ -1254,9 +1238,8 @@ class ModuleCache(object):
if min_age is None: if min_age is None:
min_age = self.age_thresh_del_unversioned min_age = self.age_thresh_del_unversioned
compilelock.get_lock() with compilelock.lock_ctx():
all_key_datas = self.module_hash_to_key_data.values() all_key_datas = self.module_hash_to_key_data.values()
try:
for key_data in all_key_datas: for key_data in all_key_datas:
if not key_data.keys: if not key_data.keys:
# May happen for broken versioned keys. # May happen for broken versioned keys.
...@@ -1329,17 +1312,12 @@ class ModuleCache(object): ...@@ -1329,17 +1312,12 @@ class ModuleCache(object):
_rmtree(os.path.join(self.dirname, filename), _rmtree(os.path.join(self.dirname, filename),
msg='old unversioned', level=logging.INFO, msg='old unversioned', level=logging.INFO,
ignore_nocleanup=True) ignore_nocleanup=True)
finally:
compilelock.release_lock()
def _on_atexit(self): def _on_atexit(self):
# Note: no need to call refresh() since it is called by clear_old(). # Note: no need to call refresh() since it is called by clear_old().
compilelock.get_lock() with compilelock.lock_ctx()
try:
self.clear_old() self.clear_old()
self.clear_unversioned() self.clear_unversioned()
finally:
compilelock.release_lock()
_logger.debug('Time spent checking keys: %s', _logger.debug('Time spent checking keys: %s',
self.time_spent_in_check_key) self.time_spent_in_check_key)
......
...@@ -7,6 +7,7 @@ import random ...@@ -7,6 +7,7 @@ import random
import socket # only used for gethostname() import socket # only used for gethostname()
import time import time
import logging import logging
import contextlib
from theano import config from theano import config
from theano.configparser import AddConfigVar, IntParam from theano.configparser import AddConfigVar, IntParam
...@@ -44,6 +45,14 @@ def force_unlock(): ...@@ -44,6 +45,14 @@ def force_unlock():
release_lock() release_lock()
@contextmanager
def lock_ctx(lock_dir=None, keep_lock=False, **kw):
get_lock(lock_dir=lock_dir, **kw)
yield
if not keep_lock:
release_lock()
def get_lock(lock_dir=None, **kw): def get_lock(lock_dir=None, **kw):
""" """
Obtain lock on compilation directory. Obtain lock on compilation directory.
......
File mode changed from 100755 to 100644
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论