提交 9ddb07e2 authored 作者: Pascal Lamblin's avatar Pascal Lamblin

Merge pull request #1548 from nouiz/compiledir

Fix infinite loop when we can't create the lock_dir directory.
# Locking mechanism to ensure no two compilations occur simultaneously in the # Locking mechanism to ensure no two compilations occur simultaneously
# same compilation directory (which can cause crashes). # in the same compilation directory (which can cause crashes).
from theano import config import atexit
import os, random, time, atexit import os
import random
import socket # only used for gethostname() import socket # only used for gethostname()
import time
import logging import logging
_logger=logging.getLogger("theano.gof.compilelock")
_logger.setLevel(logging.INFO) # INFO will show the the messages "Refreshing lock" message from theano import config
_logger = logging.getLogger("theano.gof.compilelock")
# INFO will show the the messages "Refreshing lock" message
_logger.setLevel(logging.INFO)
# In seconds, time that a process will wait before deciding to override an # In seconds, time that a process will wait before deciding to override an
# existing lock. An override only happens when the existing lock is held by # existing lock. An override only happens when the existing lock is held by
...@@ -82,6 +88,7 @@ def get_lock(lock_dir=None, **kw): ...@@ -82,6 +88,7 @@ def get_lock(lock_dir=None, **kw):
get_lock.start_time = now get_lock.start_time = now
get_lock.n_lock += 1 get_lock.n_lock += 1
def release_lock(): def release_lock():
""" """
Release lock on compilation directory. Release lock on compilation directory.
...@@ -93,6 +100,7 @@ def release_lock(): ...@@ -93,6 +100,7 @@ def release_lock():
get_lock.start_time = None get_lock.start_time = None
get_lock.unlocker.unlock() get_lock.unlocker.unlock()
def set_lock_status(use_lock): def set_lock_status(use_lock):
""" """
Enable or disable the lock on the compilation directory (which is enabled Enable or disable the lock on the compilation directory (which is enabled
...@@ -104,6 +112,7 @@ def set_lock_status(use_lock): ...@@ -104,6 +112,7 @@ def set_lock_status(use_lock):
""" """
get_lock.lock_is_enabled = use_lock get_lock.lock_is_enabled = use_lock
def lock(tmp_dir, timeout=120, min_wait=5, max_wait=10, verbosity=1): def lock(tmp_dir, timeout=120, min_wait=5, max_wait=10, verbosity=1):
""" """
Obtain lock access by creating a given temporary directory (whose base will Obtain lock access by creating a given temporary directory (whose base will
...@@ -157,6 +166,7 @@ def lock(tmp_dir, timeout=120, min_wait=5, max_wait=10, verbosity=1): ...@@ -157,6 +166,7 @@ def lock(tmp_dir, timeout=120, min_wait=5, max_wait=10, verbosity=1):
no_display = (verbosity == 0) no_display = (verbosity == 0)
# Acquire lock. # Acquire lock.
nb_error = 0
while True: while True:
try: try:
last_owner = 'no_owner' last_owner = 'no_owner'
...@@ -211,7 +221,8 @@ def lock(tmp_dir, timeout=120, min_wait=5, max_wait=10, verbosity=1): ...@@ -211,7 +221,8 @@ def lock(tmp_dir, timeout=120, min_wait=5, max_wait=10, verbosity=1):
msg = "process '%s'" % read_owner.split('_')[0] msg = "process '%s'" % read_owner.split('_')[0]
_logger.info("Waiting for existing lock by %s (I am " _logger.info("Waiting for existing lock by %s (I am "
"process '%s')", msg, my_pid) "process '%s')", msg, my_pid)
_logger.info("To manually release the lock, delete %s", tmp_dir) _logger.info("To manually release the lock, delete %s",
tmp_dir)
if verbosity <= 1: if verbosity <= 1:
no_display = True no_display = True
time.sleep(random.uniform(min_wait, max_wait)) time.sleep(random.uniform(min_wait, max_wait))
...@@ -219,9 +230,13 @@ def lock(tmp_dir, timeout=120, min_wait=5, max_wait=10, verbosity=1): ...@@ -219,9 +230,13 @@ def lock(tmp_dir, timeout=120, min_wait=5, max_wait=10, verbosity=1):
try: try:
os.mkdir(tmp_dir) os.mkdir(tmp_dir)
except OSError: except OSError:
# Error while creating the directory: someone else must have tried # Error while creating the directory: someone else
# at the exact same time. # must have tried at the exact same time.
nb_error += 1
if nb_error < 10:
continue continue
else:
raise
# Safety check: the directory should be here. # Safety check: the directory should be here.
assert os.path.isdir(tmp_dir) assert os.path.isdir(tmp_dir)
...@@ -241,22 +256,28 @@ def lock(tmp_dir, timeout=120, min_wait=5, max_wait=10, verbosity=1): ...@@ -241,22 +256,28 @@ def lock(tmp_dir, timeout=120, min_wait=5, max_wait=10, verbosity=1):
except Exception, e: except Exception, e:
# If something wrong happened, we try again. # If something wrong happened, we try again.
_logger.warning("Something wrong happened: %s %s", type(e), e) _logger.warning("Something wrong happened: %s %s", type(e), e)
nb_error += 1
if nb_error > 10:
raise
time.sleep(random.uniform(min_wait, max_wait)) time.sleep(random.uniform(min_wait, max_wait))
continue continue
def refresh_lock(lock_file): def refresh_lock(lock_file):
""" """
'Refresh' an existing lock by re-writing the file containing the owner's 'Refresh' an existing lock by re-writing the file containing the owner's
unique id, using a new (randomly generated) id, which is also returned. unique id, using a new (randomly generated) id, which is also returned.
""" """
unique_id = '%s_%s_%s' % (os.getpid(), unique_id = '%s_%s_%s' % (
''.join([str(random.randint(0,9)) for i in range(10)]), os.getpid(),
''.join([str(random.randint(0, 9)) for i in range(10)]),
socket.gethostname()) socket.gethostname())
lock_write = open(lock_file, 'w') lock_write = open(lock_file, 'w')
lock_write.write(unique_id + '\n') lock_write.write(unique_id + '\n')
lock_write.close() lock_write.close()
return unique_id return unique_id
class Unlocker(object): class Unlocker(object):
""" """
Class wrapper around release mechanism so that the lock is automatically Class wrapper around release mechanism so that the lock is automatically
...@@ -274,12 +295,14 @@ class Unlocker(object): ...@@ -274,12 +295,14 @@ class Unlocker(object):
self.unlock() self.unlock()
def unlock(self): def unlock(self):
""" """Remove current lock.
Remove current lock.
This function does not crash if it is unable to properly delete the lock This function does not crash if it is unable to properly
file and directory. The reason is that it should be allowed for multiple delete the lock file and directory. The reason is that it
jobs running in parallel to unlock the same directory at the same time should be allowed for multiple jobs running in parallel to
(e.g. when reaching their timeout limit). unlock the same directory at the same time (e.g. when reaching
their timeout limit).
""" """
# If any error occurs, we assume this is because someone else tried to # If any error occurs, we assume this is because someone else tried to
# unlock this directory at the same time. # unlock this directory at the same time.
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论