提交 964dbdd7 authored 作者: Olivier Delalleau's avatar Olivier Delalleau

Using new compilation directory lock to make parallel execution safer

上级 36233c33
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
Defines Linkers that deal with C implementations. Defines Linkers that deal with C implementations.
""" """
# Python imports # Python imports
from copy import copy from copy import copy
import md5 import md5
...@@ -20,6 +19,7 @@ import link ...@@ -20,6 +19,7 @@ import link
import utils import utils
from compiledir import * from compiledir import *
from compilelock import get_lock, release_lock
class CodeBlock: class CodeBlock:
"""WRITEME """WRITEME
...@@ -301,7 +301,6 @@ def struct_variable_codeblocks(variable, policies, id, symbol_table, sub): ...@@ -301,7 +301,6 @@ def struct_variable_codeblocks(variable, policies, id, symbol_table, sub):
return struct_builder, block return struct_builder, block
class CLinker(link.Linker): class CLinker(link.Linker):
"""WRITEME """WRITEME
...@@ -616,8 +615,17 @@ class CLinker(link.Linker): ...@@ -616,8 +615,17 @@ class CLinker(link.Linker):
f() f()
first_output = ostor[0].data first_output = ostor[0].data
""" """
cthunk, in_storage, out_storage, error_storage = self.__compile__(input_storage, output_storage) # Note: acquiring the lock here may not be necessary. However, it is
return _execute(cthunk, self.init_tasks, self.tasks, error_storage), in_storage, out_storage # cheap enough that it should not matter.
get_lock()
try:
cthunk, in_storage, out_storage, error_storage = self.__compile__(input_storage, output_storage)
res = _execute(cthunk, self.init_tasks, self.tasks, error_storage), in_storage, out_storage
except:
release_lock()
raise
release_lock()
return res
def cthunk_factory(self, error_storage, in_storage, out_storage): def cthunk_factory(self, error_storage, in_storage, out_storage):
"""WRITEME """WRITEME
...@@ -632,98 +640,112 @@ class CLinker(link.Linker): ...@@ -632,98 +640,112 @@ class CLinker(link.Linker):
type, value and traceback of the exception in error_storage. type, value and traceback of the exception in error_storage.
""" """
# check if we already compiled this get_lock()
if not getattr(self, 'instantiate', False):
try:
self.code_gen()
module_name = self.hash # check if we already compiled this
if not getattr(self, 'instantiate', False):
# Eliminate duplicate inputs and outputs from the storage that we will pass to instantiate
out_storage = [x for i, x in enumerate(out_storage) if (i+len(in_storage)) not in self.dupidx]
in_storage = [x for i, x in enumerate(in_storage) if i not in self.dupidx] self.code_gen()
module_name = self.hash
cthunk = object() # dummy so weave can get the type
mod = weave.ext_tools.ext_module(module_name) # Eliminate duplicate inputs and outputs from the storage that we will pass to instantiate
out_storage = [x for i, x in enumerate(out_storage) if (i+len(in_storage)) not in self.dupidx]
argnames = ["i%i" % i for i in xrange(len(in_storage))] \ in_storage = [x for i, x in enumerate(in_storage) if i not in self.dupidx]
+ ["o%i" % i for i in xrange(len(out_storage))] \
+ ["orph%i" % i for i in xrange(len(self.orphans))] cthunk = object() # dummy so weave can get the type
mod = weave.ext_tools.ext_module(module_name)
# The code of instantiate
code = """ argnames = ["i%i" % i for i in xrange(len(in_storage))] \
%(struct_name)s* struct_ptr = new %(struct_name)s(); + ["o%i" % i for i in xrange(len(out_storage))] \
struct_ptr->init(error_storage, %(args)s); + ["orph%i" % i for i in xrange(len(self.orphans))]
PyObject* thunk = PyCObject_FromVoidPtrAndDesc((void*)(&%(struct_name)s_executor), struct_ptr, %(struct_name)s_destructor);
return thunk; # The code of instantiate
// return_val = thunk; // oh my god weave why does this leak >:\ code = """
""" % dict(struct_name = self.struct_name, %(struct_name)s* struct_ptr = new %(struct_name)s();
args = ", ".join(argnames)) struct_ptr->init(error_storage, %(args)s);
PyObject* thunk = PyCObject_FromVoidPtrAndDesc((void*)(&%(struct_name)s_executor), struct_ptr, %(struct_name)s_destructor);
d = dict(error_storage = object()) return thunk;
for argname in argnames: // return_val = thunk; // oh my god weave why does this leak >:\
d[argname] = object() """ % dict(struct_name = self.struct_name,
args = ", ".join(argnames))
instantiate = weave.ext_tools.ext_function('instantiate',
code, d = dict(error_storage = object())
['error_storage'] + argnames, for argname in argnames:
local_dict = d, d[argname] = object()
global_dict = {})
instantiate = weave.ext_tools.ext_function('instantiate',
# Static methods that can run and destroy the struct built by instantiate. code,
static = """ ['error_storage'] + argnames,
int %(struct_name)s_executor(%(struct_name)s* self) { local_dict = d,
return self->run(); global_dict = {})
}
# Static methods that can run and destroy the struct built by instantiate.
void %(struct_name)s_destructor(void* executor, void* self) { static = """
//printf("doing cleanup\\n"); int %(struct_name)s_executor(%(struct_name)s* self) {
//fflush(stdout); return self->run();
((%(struct_name)s*)self)->cleanup(); }
free(self);
//printf("done cleanup\\n"); void %(struct_name)s_destructor(void* executor, void* self) {
//fflush(stdout); //printf("doing cleanup\\n");
} //fflush(stdout);
""" % dict(struct_name = self.struct_name) ((%(struct_name)s*)self)->cleanup();
free(self);
# We add all the support code, compile args, headers and libs we need. //printf("done cleanup\\n");
for support_code in self.support_code(): //fflush(stdout);
instantiate.customize.add_support_code(support_code) }
instantiate.customize.add_support_code(self.struct_code) """ % dict(struct_name = self.struct_name)
instantiate.customize.add_support_code(static)
for extra_arg in ( # We add all the support code, compile args, headers and libs we need.
"-O2", for support_code in self.support_code():
"-ffast-math", instantiate.customize.add_support_code(support_code)
#"-fprefetch-loop-arrays", instantiate.customize.add_support_code(self.struct_code)
#"-ftree-vect-loop-version", instantiate.customize.add_support_code(static)
#"-ftree-loop-optimize", for extra_arg in (
#"-ftree-vectorize"): "-O2",
"-w" #-w means supress all warnings "-ffast-math",
): #"-fprefetch-loop-arrays",
instantiate.customize.add_extra_compile_arg(extra_arg) #"-ftree-vect-loop-version",
for arg in self.compile_args(): #"-ftree-loop-optimize",
instantiate.customize.add_extra_compile_arg(arg) #"-ftree-vectorize"):
for header in self.headers(): "-w" #-w means supress all warnings
instantiate.customize.add_header(header) ):
for lib in self.libraries(): instantiate.customize.add_extra_compile_arg(extra_arg)
instantiate.customize.add_library(lib) for arg in self.compile_args():
instantiate.customize.add_extra_compile_arg(arg)
mod.add_function(instantiate) for header in self.headers():
#mod.compile(location = compile_dir()) instantiate.customize.add_header(header)
mod.compile(location = get_compiledir()) for lib in self.libraries():
module = __import__("%s" % (module_name), {}, {}, [module_name]) instantiate.customize.add_library(lib)
self.instantiate = module.instantiate mod.add_function(instantiate)
else:
# Eliminate duplicate inputs and outputs from the storage that we will pass to instantiate mod.compile(location = get_compiledir())
out_storage = [x for i, x in enumerate(out_storage) if (i+len(in_storage)) not in self.dupidx]
in_storage = [x for i, x in enumerate(in_storage) if i not in self.dupidx] module = __import__("%s" % (module_name), {}, {}, [module_name])
module_name = self.hash
module = __import__("%s" % (module_name), {}, {}, [module_name]) self.instantiate = module.instantiate
orphd = [[orphan.data] for orphan in self.orphans]
ret = module.instantiate(error_storage, *(in_storage + out_storage + orphd)) else:
#win pdb add 3 ref count, so we disable it by default. # Eliminate duplicate inputs and outputs from the storage that we will pass to instantiate
#assert sys.getrefcount(ret) == 2 # refcount leak check out_storage = [x for i, x in enumerate(out_storage) if (i+len(in_storage)) not in self.dupidx]
in_storage = [x for i, x in enumerate(in_storage) if i not in self.dupidx]
module_name = self.hash
module = __import__("%s" % (module_name), {}, {}, [module_name])
orphd = [[orphan.data] for orphan in self.orphans]
ret = module.instantiate(error_storage, *(in_storage + out_storage + orphd))
#win pdb add 3 ref count, so we disable it by default.
#assert sys.getrefcount(ret) == 2 # refcount leak check
except:
release_lock()
raise
release_lock()
return ret return ret
...@@ -793,6 +815,10 @@ class OpWiseCLinker(link.LocalLinker): ...@@ -793,6 +815,10 @@ class OpWiseCLinker(link.LocalLinker):
return self return self
def make_all(self, profiler = None, input_storage = None, output_storage = None): def make_all(self, profiler = None, input_storage = None, output_storage = None):
# Acquire lock on compilation directory.
get_lock()
env = self.env env = self.env
order = env.toposort() order = env.toposort()
no_recycling = self.no_recycling no_recycling = self.no_recycling
...@@ -869,6 +895,9 @@ class OpWiseCLinker(link.LocalLinker): ...@@ -869,6 +895,9 @@ class OpWiseCLinker(link.LocalLinker):
f.allow_gc = self.allow_gc f.allow_gc = self.allow_gc
# Release lock on compilation directory.
release_lock()
return f, [link.Container(input, storage) for input, storage in zip(env.inputs, input_storage)], \ return f, [link.Container(input, storage) for input, storage in zip(env.inputs, input_storage)], \
[link.Container(output, storage, True) for output, storage in zip(env.outputs, output_storage)], \ [link.Container(output, storage, True) for output, storage in zip(env.outputs, output_storage)], \
thunks, order thunks, order
...@@ -960,8 +989,3 @@ class DualLinker(link.Linker): ...@@ -960,8 +989,3 @@ class DualLinker(link.Linker):
return f, i1, o1 return f, i1, o1
from compiledir import * from compiledir import *
from compilelock import get_lock, release_lock
import sys import sys
...@@ -30,8 +31,14 @@ except ImportError: ...@@ -30,8 +31,14 @@ except ImportError:
cthunk = object() cthunk = object()
mod = weave.ext_tools.ext_module('cutils_ext') mod = weave.ext_tools.ext_module('cutils_ext')
fun =weave.ext_tools.ext_function('run_cthunk', single_runner, ['cthunk']) fun = weave.ext_tools.ext_function('run_cthunk', single_runner, ['cthunk'])
fun.customize.add_extra_compile_arg('--permissive') fun.customize.add_extra_compile_arg('--permissive')
mod.add_function(fun) mod.add_function(fun)
mod.compile(location = get_compiledir()) get_lock()
try:
mod.compile(location = get_compiledir())
except:
release_lock()
raise
release_lock()
from cutils_ext import * from cutils_ext import *
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论