提交 d4e7e4f1 authored 作者: Li's avatar Li 提交者: Frederic

support: no need to re-optmize when unpickling theano function. added a new…

support: no need to re-optmize when unpickling theano function. added a new option reoptimize_unpickled_function=False
上级 54bc8d12
......@@ -703,6 +703,7 @@ class Function(object):
# pickling/deepcopy support for Function
def _pickle_Function(f):
print 'pickling Function..'
#copy of the input storage list
ins = list(f.input_storage)
input_storage = []
......@@ -736,11 +737,11 @@ def _pickle_Function(f):
' operation') %(str(d_i), str(d_j)))
else:
raise AliasedMemoryError(d_i, d_j)
rval = (_constructor_Function, (f.maker, input_storage, inputs_data))
return rval
def _constructor_Function(maker, input_storage, inputs_data):
print 'unpickling Function...'
if not theano.config.unpickle_function:
return None
f = maker.create(input_storage, trustme = True)
......@@ -969,10 +970,180 @@ class FunctionMaker(object):
return SymbolicOutput(output)
else:
raise TypeError("Unknown output type: %s (%s)", type(output), output)
def retrieve_fgraph_from_opt_cache():
# This function is not finished
raise NotImplementedError('optimization cache is not finished! Should not be called.')
from theano.gof.compilelock import get_lock, release_lock
import os.path
graph_db_file = os.path.join(theano.config.compiledir, 'optimized_graphs.pkl')
# the inputs, outputs, and size of the graph to be optimized
inputs_new = [inp.variable for inp in inputs]
outputs_new = [out.variable for out in outputs]
size_new = len(fgraph.apply_nodes)
need_optimize = False
get_lock()
key = None
#Beginning of cache optimizations.
#Could be refactored in different functions.
if theano.config.cache_optimizations: #set to false by default
'''
graph_db and need_optimize
'''
if os.path.isfile(graph_db_file):
print 'graph_db exists'
else:
# create graph_db
f = open(graph_db_file, 'wb')
print 'created new graph_db %s' % graph_db_file
#file needs to be open and closed for every pickle
f.close()
# load the graph_db dictionary
try:
f = open(graph_db_file, 'rb')
#Temporary hack to allow theano.scan_module.tests.test_scan.T_Scan
#to finish. Should be changed in definitive version.
tmp = theano.config.unpickle_function
theano.config.unpickle_function = False
graph_db = cPickle.load(f)
theano.config.unpickle_function = tmp
#hack end
f.close()
print 'graph_db is not empty'
except EOFError, e:
# the file has nothing in it
print e
print 'graph_db is empty'
graph_db = {}
need_optimize = True
print 'loaded graph_db from %s, size=%d' % (graph_db_file, len(graph_db))
# the sole purpose of this loop is to set 'need_optimize'
for i, graph_old in enumerate(graph_db.keys()):
inputs_old = graph_old.inputs
outputs_old = graph_old.outputs
size_old = len(graph_old.apply_nodes)
print 'looping through graph_db %d/%d' % (i + 1, len(graph_db))
# Some heuristics to check is the same graphs have
# already been optimized before.
if len(inputs_new) != len(inputs_old):
# If the inputs are of different size,
# two graphs are for sure different
print 'need to optimize, because input size is different'
continue
elif len(outputs_new) != len(outputs_old):
# If the inputs are of different size,
# two graphs are for sure different
print 'need to optimize, because output size is different'
continue
elif not all(input_new.type == input_old.type for
input_new, input_old in zip(inputs_new, inputs_old)):
print 'need to optimize, because inputs are of different types'
continue
elif not all(output_new.type == output_old.type for
output_new, output_old in zip(outputs_new, outputs_old)):
print 'need to optimize, because outputs are of different types'
continue
elif not size_old == size_new:
print 'need to optimize, because numbers of nodes in graph are different'
continue
else:
flags = []
for output_new, output_old, i in zip(outputs_new, outputs_old, range(len(outputs_new))):
print 'loop through outputs node for both graphs'
graph_old.variables = set(gof.graph.variables(graph_old.inputs, graph_old.outputs))
#using clone allowed to avoid a lot of errors
#deep copy seemed to had.
f2 = graph_old.clone(check_integrity=False)
t1 = output_new
t2 = f2.outputs[i]
#Used to remove "already used by another graph error
def removeAllFgraph(remove):
if hasattr(remove, 'fgraph'):
del remove.fgraph
if hasattr(remove, 'owner'):
if remove.owner == None:
pass
else:
if hasattr(remove.owner, 'fgraph'):
del remove.owner.fgraph
if hasattr(remove.owner, 'inputs'):
remove.owner.inputs = [removeAllFgraph(
i) for i in remove.owner.inputs]
for o in remove.owner.outputs:
if hasattr(o, 'fgraph'):
del o.fgraph
return remove
t2 = removeAllFgraph(t2)
givens = dict(zip(gof.graph.inputs([t1]),
gof.graph.inputs([t2])))
temp = dict(zip(gof.graph.inputs([t1]),
gof.graph.inputs([t2])))
#hack to remove inconstent entry in givens
#seems to work that but source of inconsistency
#could be worth investigating.
for key, value in temp.iteritems():
if key.type != value.type:
del givens[key]
flag = is_same_graph(t1, t2, givens=givens)
flags.append(flag)
is_same = all(flags)
if is_same:
# found the match
print 'found #TODO: he match, no need to optimize'
need_optimize = False
key = graph_old
break
if need_optimize:
# this is a brand new graph, optimize it, save it to graph_db
print 'optimizing the graph'
fgraph.variables = set(gof.graph.variables(fgraph.inputs, fgraph.outputs))
#check_integrity parameters was added to ignore
#"excess cached variables" errors. Works that way
#but once again the error couldbe worth
#investigating.
before_opt = fgraph.clone(check_integrity=False)
start_optimizer = time.time()
optimizer_profile = optimizer(fgraph)
end_optimizer = time.time()
opt_time = end_optimizer - start_optimizer
graph_db.update({before_opt:fgraph})
f = open(graph_db_file, 'wb')
cPickle.dump(graph_db, f, -1)
f.close()
print 'saved into graph_db'
else:
print 'no opt, get graph from graph_db'
# just read the optmized graph from graph_db
opt_time = 0
#"Naive" insertion. It's seems to work, but there may
#be some problems inserting it like that.
self.fgraph = graph_db[key]
fgraph = self.fgraph
# release stuff
release_lock()
def __init__(self, inputs, outputs,
mode=None, accept_inplace=False, function_builder=Function,
profile=None, on_unused_input=None):
profile=None, on_unused_input=None, fgraph=None):
"""
:type inputs: a list of SymbolicInput instances
......@@ -1039,7 +1210,6 @@ class FunctionMaker(object):
inputs = [inputs]
# Wrap them in In or Out instances if needed.
#import pudb; pudb.set_trace()
inputs, outputs = map(self.wrap_in, inputs), map(self.wrap_out, outputs)
_inputs = gof.graph.inputs([o.variable for o in outputs] + [i.update
for i in inputs if getattr(i, 'update', False)])
......@@ -1051,211 +1221,56 @@ class FunctionMaker(object):
# tuple for each input. (See Function.indices for more details)
indices = [[input] + self.expand_in(input, _inputs) for input in inputs]
# make the fgraph (copies the graph, creates NEW INPUT AND OUTPUT VARIABLES)
fgraph, additional_outputs = std_fgraph(inputs, outputs, accept_inplace)
fgraph.profile = profile
if fgraph is None:
need_opt = True
else:
need_opt = False
if fgraph is None:
# make the fgraph (copies the graph, creates NEW INPUT AND OUTPUT VARIABLES)
fgraph, additional_outputs = std_fgraph(inputs, outputs, accept_inplace)
fgraph.profile = profile
else:
# fgraph is already an optimized one
_, additional_outputs = std_fgraph(inputs, outputs, accept_inplace)
pass
self.fgraph = fgraph
# Fetch the optimizer and linker
optimizer, linker = mode.optimizer, copy.copy(mode.linker)
# optimize the fgraph
compute_test_value_orig = theano.config.compute_test_value
add_stack_trace_on_call = gof.Op.add_stack_trace_on_call
try:
theano.config.compute_test_value = theano.config.compute_test_value_opt
gof.Op.add_stack_trace_on_call = False
from theano.gof.compilelock import get_lock, release_lock
import os.path
graph_db_file = os.path.join(theano.config.compiledir, 'optimized_graphs.pkl')
# the inputs, outputs, and size of the graph to be optimized
inputs_new = [inp.variable for inp in inputs]
outputs_new = [out.variable for out in outputs]
size_new = len(fgraph.apply_nodes)
need_optimize = False
get_lock()
key = None
#Beginning of cache optimizations.
#Could be refactored in different functions.
if theano.config.cache_optimizations: #set to false by default
'''
graph_db and need_optimize
'''
if os.path.isfile(graph_db_file):
print 'graph_db exists'
else:
# create graph_db
f = open(graph_db_file, 'wb')
print 'created new graph_db %s' % graph_db_file
#file needs to be open and closed for every pickle
f.close()
# load the graph_db dictionary
try:
f = open(graph_db_file, 'rb')
#Temporary hack to allow theano.scan_module.tests.test_scan.T_Scan
#to finish. Should be changed in definitive version.
tmp = theano.config.unpickle_function
theano.config.unpickle_function = False
graph_db = cPickle.load(f)
theano.config.unpickle_function = tmp
#hack end
f.close()
print 'graph_db is not empty'
except EOFError, e:
# the file has nothing in it
print e
print 'graph_db is empty'
graph_db = {}
need_optimize = True
print 'loaded graph_db from %s, size=%d' % (graph_db_file, len(graph_db))
# the sole purpose of this loop is to set 'need_optimize'
for i, graph_old in enumerate(graph_db.keys()):
inputs_old = graph_old.inputs
outputs_old = graph_old.outputs
size_old = len(graph_old.apply_nodes)
print 'looping through graph_db %d/%d' % (i + 1, len(graph_db))
# Some heuristics to check is the same graphs have
# already been optimized before.
if len(inputs_new) != len(inputs_old):
# If the inputs are of different size,
# two graphs are for sure different
print 'need to optimize, because input size is different'
continue
elif len(outputs_new) != len(outputs_old):
# If the inputs are of different size,
# two graphs are for sure different
print 'need to optimize, because output size is different'
continue
elif not all(input_new.type == input_old.type for
input_new, input_old in zip(inputs_new, inputs_old)):
print 'need to optimize, because inputs are of different types'
continue
elif not all(output_new.type == output_old.type for
output_new, output_old in zip(outputs_new, outputs_old)):
print 'need to optimize, because outputs are of different types'
continue
elif not size_old == size_new:
print 'need to optimize, because numbers of nodes in graph are different'
continue
else:
flags = []
for output_new, output_old, i in zip(outputs_new, outputs_old, range(len(outputs_new))):
print 'loop through outputs node for both graphs'
graph_old.variables = set(gof.graph.variables(graph_old.inputs, graph_old.outputs))
#using clone allowed to avoid a lot of errors
#deep copy seemed to had.
f2 = graph_old.clone(check_integrity=False)
t1 = output_new
t2 = f2.outputs[i]
#Used to remove "already used by another graph error
def removeAllFgraph(remove):
if hasattr(remove, 'fgraph'):
del remove.fgraph
if hasattr(remove, 'owner'):
if remove.owner == None:
pass
else:
if hasattr(remove.owner, 'fgraph'):
del remove.owner.fgraph
if hasattr(remove.owner, 'inputs'):
remove.owner.inputs = [removeAllFgraph(
i) for i in remove.owner.inputs]
for o in remove.owner.outputs:
if hasattr(o, 'fgraph'):
del o.fgraph
return remove
t2 = removeAllFgraph(t2)
givens = dict(zip(gof.graph.inputs([t1]),
gof.graph.inputs([t2])))
temp = dict(zip(gof.graph.inputs([t1]),
gof.graph.inputs([t2])))
#hack to remove inconstent entry in givens
#seems to work that but source of inconsistency
#could be worth investigating.
for key, value in temp.iteritems():
if key.type != value.type:
del givens[key]
flag = is_same_graph(t1, t2, givens=givens)
flags.append(flag)
is_same = all(flags)
if is_same:
# found the match
print 'found #TODO: he match, no need to optimize'
need_optimize = False
key = graph_old
break
# now optimize or not
if need_optimize:
# this is a brand new graph, optimize it, save it to graph_db
print 'optimizing the graph'
fgraph.variables = set(gof.graph.variables(fgraph.inputs, fgraph.outputs))
#check_integrity parameters was added to ignore
#"excess cached variables" errors. Works that way
#but once again the error couldbe worth
#investigating.
before_opt = fgraph.clone(check_integrity=False)
start_optimizer = time.time()
optimizer_profile = optimizer(fgraph)
end_optimizer = time.time()
opt_time = end_optimizer - start_optimizer
graph_db.update({before_opt:fgraph})
f = open(graph_db_file, 'wb')
cPickle.dump(graph_db, f, -1)
f.close()
print 'saved into graph_db'
else:
print 'no opt, get graph from graph_db'
# just read the optmized graph from graph_db
opt_time = 0
#"Naive" insertion. It's seems to work, but there may
#be some problems inserting it like that.
self.fgraph = graph_db[key]
fgraph = self.fgraph
# release stuff
release_lock()
#end of cache optimization
#else containing the old code
else:
if need_opt:
# optimize the fgraph
print 'fgraph is optimized'
try:
theano.config.compute_test_value = theano.config.compute_test_value_opt
gof.Op.add_stack_trace_on_call = False
start_optimizer = time.time()
optimizer_profile = optimizer(fgraph)
end_optimizer = time.time()
opt_time = end_optimizer - start_optimizer
#print 'opt took %s' % opt_time
if profile:
profile.optimizer_time += opt_time
if theano.config.profile_optimizer:
profile.optimizer_profile = (optimizer, optimizer_profile)
_logger.debug('Optimizing took %f seconds', opt_time)
#Add deep copy to respect the memory interface
insert_deepcopy(fgraph, inputs, outputs + additional_outputs)
finally:
if profile:
profile.optimizer_time += opt_time
if theano.config.profile_optimizer:
profile.optimizer_profile = (optimizer, optimizer_profile)
_logger.debug('Optimizing took %f seconds', opt_time)
#Add deep copy to respect the memory interface
insert_deepcopy(fgraph, inputs, outputs + additional_outputs)
finally:
theano.config.compute_test_value = compute_test_value_orig
gof.Op.add_stack_trace_on_call = add_stack_trace_on_call
else:
# fgraph is already optimized
print 'fgraph is not optimized'
theano.config.compute_test_value = compute_test_value_orig
gof.Op.add_stack_trace_on_call = add_stack_trace_on_call
# initialize the linker
if not hasattr(linker, 'accept'):
raise ValueError("'linker' parameter of FunctionMaker should be a Linker with an accept method " \
......@@ -1415,9 +1430,11 @@ class FunctionMaker(object):
def _pickle_FunctionMaker(self):
'picking FunctionMaker'
kwargs = dict(
inputs=self.inputs,
outputs=self.orig_outputs,
fgraph=self.fgraph,
mode=self.mode,
accept_inplace=self.accept_inplace,
function_builder=self.function_builder,
......@@ -1428,7 +1445,10 @@ def _pickle_FunctionMaker(self):
def _constructor_FunctionMaker(kwargs):
print 'unpickling FunctionMaker...'
if theano.config.unpickle_function:
if theano.config.reoptimize_unpickled_function:
del kwargs['fgraph']
return FunctionMaker(**kwargs)
else:
return None
......
......@@ -118,6 +118,7 @@ AddConfigVar('print_active_device',
BoolParam(True, allow_override=False),
in_c_key=False)
# Do not add FAST_RUN_NOGC to this list (nor any other ALL CAPS shortcut).
# The way to get FAST_RUN_NOGC is with the flag 'linker=c|py_nogc'.
# The old all capital letter way of working is deprecated as it is not
......@@ -465,6 +466,12 @@ AddConfigVar('unpickle_function',
BoolParam(True),
in_c_key=False)
AddConfigVar('reoptimize_unpickled_function',
"Re-optimize the graph when a theano function is unpickled from the disk.",
BoolParam(False, allow_override=False),
in_c_key=False)
"""Note to developers:
Generally your exceptions should use an apply node's __str__
method when exception_verbosity == 'low'. When exception_verbosity
......
......@@ -8,7 +8,7 @@ floatX = 'float32'
def test_pickle_unpickle():
# Test if pick and unpickling a theano function with
# shared variables work
# shared variables should be pickled properly
x1 = T.fmatrix('x1')
x2 = T.fmatrix('x2')
x3 = theano.shared(numpy.ones((10,10),dtype=floatX))
......@@ -19,12 +19,12 @@ def test_pickle_unpickle():
updates[x3] = x3 + 1
updates[x4] = x4 + 1
f = theano.function([x1,x2],y, updates=updates)
pkl_path = open('thean_fn.pkl','wb')
cPickle.dump(f, pkl_path, -1)
pkl_path = open('thean_fn.pkl','r')
f_ = cPickle.load(pkl_path)
import ipdb; ipdb.set_trace()
in1 = numpy.ones((10, 10), dtype=floatX)
in2 = numpy.ones((10, 10), dtype=floatX)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论